diff --git a/plugins/emqttd_auth_mysql/rebar.config b/plugins/emqttd_auth_mysql/rebar.config deleted file mode 100644 index 2e9d5c953..000000000 --- a/plugins/emqttd_auth_mysql/rebar.config +++ /dev/null @@ -1,19 +0,0 @@ -%% -*- mode: erlang;erlang-indent-level: 4;indent-tabs-mode: nil -*- -%% ex: ts=4 sw=4 ft=erlang et - -{require_min_otp_vsn, "R17"}. - -%% fail_on_warning, -{erl_opts, [debug_info, {parse_transform, lager_transform}]}. - -{erl_opts, [warn_export_all, - warn_unused_import, - {i, "include"}, - {src_dirs, ["src"]}]}. - -{xref_checks, [undefined_function_calls]}. - -{deps, [ - {'Emysql', "*", {git, "git://github.com/Eonblast/Emysql.git", {branch, "master"}}} -]}. - diff --git a/plugins/emqttd_auth_mysql/src/emqttd_auth_mysql.erl b/plugins/emqttd_auth_mysql/src/emqttd_auth_mysql.erl index a6d5ce5a8..d9f8bb710 100644 --- a/plugins/emqttd_auth_mysql/src/emqttd_auth_mysql.erl +++ b/plugins/emqttd_auth_mysql/src/emqttd_auth_mysql.erl @@ -45,7 +45,8 @@ check(#mqtt_client{username = undefined}, _Password, _State) -> check(_Client, undefined, _State) -> {error, "Password undefined"}; check(#mqtt_client{username = Username}, Password, #state{user_tab = UserTab}) -> - case emysql:select(UserTab, {{username, Username}, {password, erlang:md5(Password)}}) of + %%TODO: hash password... + case emysql:select(UserTab, {{username, Username}, {password, Password}}) of {ok, []} -> {error, "Username or Password not match"}; {ok, _Record} -> ok end. diff --git a/plugins/emqttd_auth_mysql/src/emqttd_auth_mysql_app.erl b/plugins/emqttd_auth_mysql/src/emqttd_auth_mysql_app.erl index e1f16bf98..80b3f37f2 100644 --- a/plugins/emqttd_auth_mysql/src/emqttd_auth_mysql_app.erl +++ b/plugins/emqttd_auth_mysql/src/emqttd_auth_mysql_app.erl @@ -1,3 +1,29 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% mysql authentication app. +%%% +%%% @end +%%%----------------------------------------------------------------------------- -module(emqttd_auth_mysql_app). -behaviour(application). @@ -11,7 +37,8 @@ start(_StartType, _StartArgs) -> {ok, Sup} = emqttd_auth_mysql_sup:start_link(), - emqttd_access_control:register_mod(auth, emqttd_auth_mysql, []), + Env = application:get_all_env(), + emqttd_access_control:register_mod(auth, emqttd_auth_mysql, Env), {ok, Sup}. stop(_State) -> diff --git a/plugins/emysql/README.md b/plugins/emysql/README.md new file mode 100644 index 000000000..18b61598c --- /dev/null +++ b/plugins/emysql/README.md @@ -0,0 +1,42 @@ +# emysql + +Erlang MySQL client + +## config + +``` + +``` + +## Select API + +* emyssql:select(tab). +* emysql:select({tab, [col1,col2]}). +* emysql:select({tab, [col1, col2], {id,1}}). +* emysql:select(Query, Load). + +## Update API + +* emysql:update(tab, [{Field1, Val}, {Field2, Val2}], {id, 1}). + +## Insert API + +* emysql:insert(tab, [{Field1, Val}, {Field2, Val2}]). + +## Delete API + +* emysql:delete(tab, {name, Name}]). + +## Query API + +* emysql:sqlquery("select * from tab;"). + +## Prepare API + +* emysql:prepare(find_with_id, "select * from tab where id = ?;"). +* emysql:execute(find_with_id, [Id]). +* emysql:unprepare(find_with_id). + +## MySQL Client Protocal + +* http://forge.mysql.com/wiki/MySQL_Internals_ClientServer_Protocol diff --git a/plugins/emysql/include/emysql.hrl b/plugins/emysql/include/emysql.hrl new file mode 100755 index 000000000..fde65fe7a --- /dev/null +++ b/plugins/emysql/include/emysql.hrl @@ -0,0 +1,2 @@ +%% MySQL result record: +-record(mysql_result, {fieldinfo = [], rows = [], affectedrows = 0, insert_id =0, error = ""}). diff --git a/plugins/emysql/src/emysql.app.src b/plugins/emysql/src/emysql.app.src new file mode 100644 index 000000000..d494fd954 --- /dev/null +++ b/plugins/emysql/src/emysql.app.src @@ -0,0 +1,14 @@ +{application, emysql, + [{description, "Erlang MySQL Driver"}, + {vsn, "1.0"}, + {modules, [ + emysql, + emysql_app, + emysql_sup, + emysql_auth, + emysql_conn, + emysql_recv]}, + {registered, []}, + {applications, [kernel, stdlib, sasl, crypto]}, + {env, []}, + {mod, {emysql_app, []}}]}. diff --git a/plugins/emysql/src/emysql.erl b/plugins/emysql/src/emysql.erl new file mode 100644 index 000000000..bc7cd352f --- /dev/null +++ b/plugins/emysql/src/emysql.erl @@ -0,0 +1,508 @@ +%%%---------------------------------------------------------------------- +%%% File : emysql.erl +%%% Author : Ery Lee +%%% Purpose : Mysql access api. +%%% Created : 19 May 2009 +%%% License : http://www.opengoss.com +%%% +%%% Copyright (C) 2012, www.opengoss.com +%%%---------------------------------------------------------------------- +-module(emysql). + +-author('ery.lee@gmail.com'). + +-include("emysql.hrl"). + +-export([start_link/1]). + +-ifdef(use_specs). + +-spec(conns/0 :: () -> list()). + +-endif. + +%command functions +-export([info/0, + pool/1, + conns/0]). + +%sql functions +-export([insert/2, + insert/3, + select/1, + select/2, + select/3, + update/2, + update/3, + delete/1, + delete/2, + truncate/1, + prepare/2, + execute/1, + execute/2, + unprepare/1, + sqlquery/1, + sqlquery/2]). + +-behavior(gen_server). + +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-record(state, {ids}). + +%% External exports +-export([encode/1, + encode/2, + escape/1, + escape_like/1]). + +start_link(PoolSize) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [PoolSize], []). + +info() -> + [emysql_conn:info(Pid) || Pid <- + pg2:get_local_members(emysql_conn)]. + +%pool pool +pool(Id) -> + gen_server:cast(?MODULE, {pool, Id}). + +conns() -> + gen_server:call(?MODULE, conns). + +insert(Tab, Record) when is_atom(Tab) -> + sqlquery(encode_insert(Tab, Record)). + +insert(_Tab, _Fields, Values) when length(Values) == 0 -> + {updated, {0, 0}}; + +insert(Tab, Fields, Values) when length(Values) > 0 -> + sqlquery(encode_insert(Tab, Fields, Values)). + +encode_insert(Tab, Record) -> + {Fields, Values} = lists:unzip([{atom_to_list(F), encode(V)} + || {F, V} <- Record]), + ["insert into ", atom_to_list(Tab), "(", + string:join(Fields, ","), ") values(", + string:join(Values, ","), ");"]. + +encode_insert(Tab, Fields, Rows) -> + Encode = fun(Row) -> string:join([encode(V) || V <- Row], ",") end, + Rows1 = [lists:concat(["(", Encode(Row), ")"]) || Row <- Rows], + ["insert into ", atom_to_list(Tab), "(", + string:join([atom_to_list(F) || F <- Fields], ","), + ") values", string:join(Rows1, ","), ";"]. + +select(Tab) when is_atom(Tab) -> + sqlquery(encode_select(Tab)); + +select(Select) when is_tuple(Select) -> + sqlquery(encode_select(Select)). + +select(Tab, Where) when is_atom(Tab) and is_tuple(Where) -> + sqlquery(encode_select({Tab, Where})); + +select(Tab, Fields) when is_atom(Tab) and is_list(Fields) -> + sqlquery(encode_select({Tab, Fields})); + +select(Select, Load) when is_tuple(Select) and is_integer(Load) -> + sqlquery(encode_select(Select), Load). + +select(Tab, Fields, Where) when is_atom(Tab) + and is_list(Fields) and is_tuple(Where) -> + sqlquery(encode_select({Tab, Fields, Where})). + +encode_select(Tab) when is_atom(Tab) -> + encode_select({Tab, ['*'], undefined}); + +encode_select({Tab, Fields}) when is_atom(Tab) + and is_list(Fields) -> + encode_select({Tab, Fields, undefined}); + +encode_select({Tab, Where}) when is_atom(Tab) + and is_tuple(Where) -> + encode_select({Tab, ['*'], Where}); + +encode_select({Tab, Fields, undefined}) when is_atom(Tab) + and is_list(Fields) -> + ["select ", encode_fields(Fields), " from ", atom_to_list(Tab), ";"]; + +encode_select({Tab, Fields, Where}) when is_atom(Tab) + and is_list(Fields) and is_tuple(Where) -> + ["select ", encode_fields(Fields), " from ", + atom_to_list(Tab), " where ", encode_where(Where), ";"]. + +encode_fields(Fields) -> + string:join([atom_to_list(F) || F <- Fields], " ,"). + +update(Tab, Record) when is_atom(Tab) + and is_list(Record) -> + case proplists:get_value(id, Record) of + undefined -> + Updates = string:join([encode_column(Col) || Col <- Record], ","), + Query = ["update ", atom_to_list(Tab), " set ", Updates, ";"], + sqlquery(Query); + Id -> + update(Tab, lists:keydelete(id, 1, Record), {id, Id}) + end. + +update(Tab, Record, Where) -> + Update = string:join([encode_column(Col) || Col <- Record], ","), + Query = ["update ", atom_to_list(Tab), " set ", Update, + " where ", encode_where(Where), ";"], + sqlquery(Query). + +encode_column({F, V}) when is_atom(F) -> + lists:concat([atom_to_list(F), "=", encode(V)]). + +delete(Tab) when is_atom(Tab) -> + sqlquery(["delete from ", atom_to_list(Tab), ";"]). + +delete(Tab, Id) when is_atom(Tab) + and is_integer(Id) -> + Query = ["delete from ", atom_to_list(Tab), + " where ", encode_where({id, Id})], + sqlquery(Query); + +delete(Tab, Where) when is_atom(Tab) + and is_tuple(Where) -> + Query = ["delete from ", atom_to_list(Tab), + " where ", encode_where(Where)], + sqlquery(Query). + +truncate(Tab) when is_atom(Tab) -> + sqlquery(["truncate table ", atom_to_list(Tab), ";"]). + +sqlquery(Query) -> + sqlquery(Query, 1). + +sqlquery(Query, Load) -> + with_next_conn(fun(Conn) -> + case catch mysql_to_odbc(emysql_conn:sqlquery(Conn, iolist_to_binary(Query))) of + {selected, NewFields, Records} -> + {ok, to_tuple_records(NewFields, Records)}; + {error, Reason} -> + {error, Reason}; + Res -> + Res + end + end, Load). + +prepare(Name, Stmt) when is_list(Stmt) -> + prepare(Name, list_to_binary(Stmt)); + +prepare(Name, Stmt) when is_binary(Stmt) -> + with_all_conns(fun(Conn) -> + emysql_conn:prepare(Conn, Name, Stmt) + end). + +execute(Name) -> + execute(Name, []). + +execute(Name, Params) -> + with_next_conn(fun(Conn) -> + case catch mysql_to_odbc(emysql_conn:execute(Conn, Name, Params)) of + {selected, NewFields, Records} -> + {ok, to_tuple_records(NewFields, Records)}; + {error, Reason} -> + {error, Reason}; + Res -> + Res + end + end, 1). + +unprepare(Name) -> + with_all_conns(fun(Conn) -> + emysql_conn:unprepare(Conn, Name) + end). + +with_next_conn(Fun, _Load) -> + Fun(pg2:get_closest_pid(emysql_conn)). + +with_all_conns(Fun) -> + [Fun(Pid) || Pid <- pg2:get_local_members(emysql_conn)]. + +%%-------------------------------------------------------------------- +%% Function: init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% Description: Initiates the server +%%-------------------------------------------------------------------- +init([PoolSize]) -> + Ids = lists:seq(1, PoolSize), + [put(Id, 0) || Id <- Ids], + [put({count, Id}, 0) || Id <- Ids], + {ok, #state{ids = Ids}}. + +%%-------------------------------------------------------------------- +%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% Description: Handling call messages +%%-------------------------------------------------------------------- + +handle_call(info, _From, State) -> + Reply = [{conn, Id, Pid, get(Id), get({total, Id})} + || {Id, Pid} <- get_all_conns()], + {reply, Reply, State}; + +handle_call({next_conn, Load}, _From, #state{ids = Ids} = State) -> + {ConnId, ConnLoad} = + lists:foldl(fun(Id, {MinId, MinLoad}) -> + ThisLoad = get(Id), + if + ThisLoad =< MinLoad -> {Id, ThisLoad}; + true -> {MinId, MinLoad} + end + end, {undefined, 16#ffffffff}, Ids), + Reply = + case ConnId of + undefined -> + undefined; + _ -> + ConnPid = get_conn_pid(ConnId), + put(ConnId, ConnLoad+Load), + Count = get({total, ConnId}), + put({total, ConnId}, Count+1), + {ConnId, ConnPid} + end, + {reply, Reply, State}; + +handle_call(conns, _From, State) -> + Conns = get_all_conns(), + {reply, Conns, State}; + +handle_call(Req, From, State) -> + gen_server:reply(From, {badcall, Req}), + {stop, {badcall, Req}, State}. + +%%-------------------------------------------------------------------- +%% Function: handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling cast messages +%%-------------------------------------------------------------------- +handle_cast({pool, Id}, State) -> + put(Id, 0), + put({total, Id}, 0), + {noreply, State}; + +handle_cast({done, ConnId, Load}, State) -> + put(ConnId, get(ConnId) - Load), + {noreply, State}; + +handle_cast(Msg, State) -> + {stop, {badcast, Msg}, State}. + +%%-------------------------------------------------------------------- +%% Function: handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling all non call/cast messages +%%-------------------------------------------------------------------- +handle_info(Info, State) -> + {stop, {badinfo, Info}, State}. + +%%-------------------------------------------------------------------- +%% Function: terminate(Reason, State) -> void() +%% Description: This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any necessary +%% cleaning up. When it returns, the gen_server terminates with Reason. +%% The return value is ignored. +%%-------------------------------------------------------------------- +terminate(_Reason, _State) -> + ok. +%%-------------------------------------------------------------------- +%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} +%% Description: Convert process state when code is changed +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +get_conn_pid(CId) -> + [{CId, Pid, _Type, _Modules} | _] = + lists:dropwhile(fun ({Id, _Pid, _Type, _Modules}) + when Id =:= CId -> false; + (_) -> true + end, + supervisor:which_children(emysql_sup)), + Pid. + +get_all_conns() -> + [{Id, Pid} || {Id, Pid, _Type, _Modules} <- + supervisor:which_children(emysql_sup), is_integer(Id)]. + +%% Convert MySQL query result to Erlang ODBC result formalism +mysql_to_odbc({updated, #mysql_result{affectedrows=AffectedRows, insert_id = InsertId} = _MySQLRes}) -> + {updated, {AffectedRows, InsertId}}; + +mysql_to_odbc({data, #mysql_result{fieldinfo = FieldInfo, rows=AllRows} = _MySQLRes}) -> + mysql_item_to_odbc(FieldInfo, AllRows); + +mysql_to_odbc({error, MySQLRes}) when is_list(MySQLRes) -> + {error, MySQLRes}; + +mysql_to_odbc({error, #mysql_result{error=Reason} = _MySQLRes}) -> + {error, Reason}; + +mysql_to_odbc({error, Reason}) -> + {error, Reason}. + +%% When tabular data is returned, convert it to the ODBC formalism +mysql_item_to_odbc(Columns, Recs) -> + %% For now, there is a bug and we do not get the correct value from MySQL + %% module: + {selected, + [element(2, Column) || Column <- Columns], + [list_to_tuple(Rec) || Rec <- Recs]}. + +%%internal functions +encode_where({'and', L, R}) -> + encode_where(L) ++ " and " ++ encode_where(R); + +encode_where({'and', List}) when is_list(List) -> + string:join([encode_where(E) || E <- List], " and "); + +encode_where({'or', L, R}) -> + encode_where(L) ++ " or " ++ encode_where(R); + +encode_where({'or', List}) when is_list(List) -> + string:join([encode_where(E) || E <- List], " or "); + +encode_where({like, Field, Value}) -> + atom_to_list(Field) ++ " like " ++ encode(Value); + +encode_where({'<', Field, Value}) -> + atom_to_list(Field) ++ " < " ++ encode(Value); + +encode_where({'>', Field, Value}) -> + atom_to_list(Field) ++ " > " ++ encode(Value); + +encode_where({'in', Field, Values}) -> + InStr = string:join([encode(Value) || Value <- Values], ","), + atom_to_list(Field) ++ " in (" ++ InStr ++ ")"; + +encode_where({Field, Value}) -> + atom_to_list(Field) ++ " = " ++ encode(Value). + +to_tuple_records(_Fields, []) -> + []; + +to_tuple_records(Fields, Records) -> + [to_tuple_record(Fields, tuple_to_list(Record)) || Record <- Records]. + +to_tuple_record(Fields, Record) when length(Fields) == length(Record) -> + to_tuple_record(Fields, Record, []). + +to_tuple_record([], [], Acc) -> + Acc; + +to_tuple_record([_F|FT], [undefined|VT], Acc) -> + to_tuple_record(FT, VT, Acc); + +to_tuple_record([F|FT], [V|VT], Acc) -> + to_tuple_record(FT, VT, [{list_to_atom(binary_to_list(F)), V} | Acc]). + +%% Escape character that will confuse an SQL engine +%% Percent and underscore only need to be escaped for pattern matching like +%% statement +escape_like(S) when is_list(S) -> + [escape_like(C) || C <- S]; +escape_like($%) -> "\\%"; +escape_like($_) -> "\\_"; +escape_like(C) -> escape(C). + +%% Escape character that will confuse an SQL engine +escape(S) when is_list(S) -> + [escape(C) || C <- S]; +%% Characters to escape +escape($\0) -> "\\0"; +escape($\n) -> "\\n"; +escape($\t) -> "\\t"; +escape($\b) -> "\\b"; +escape($\r) -> "\\r"; +escape($') -> "\\'"; +escape($") -> "\\\""; +escape($\\) -> "\\\\"; +escape(C) -> C. + +encode(Val) -> + encode(Val, false). +encode(Val, false) when Val == undefined; Val == null -> + "NULL"; +encode(Val, true) when Val == undefined; Val == null -> + <<"NULL">>; +encode(Val, false) when is_binary(Val) -> + binary_to_list(quote(Val)); +encode(Val, true) when is_binary(Val) -> + quote(Val); +encode(Val, true) -> + list_to_binary(encode(Val,false)); +encode(Val, false) when is_atom(Val) -> + quote(atom_to_list(Val)); +encode(Val, false) when is_list(Val) -> + quote(Val); +encode(Val, false) when is_integer(Val) -> + integer_to_list(Val); +encode(Val, false) when is_float(Val) -> + [Res] = io_lib:format("~w", [Val]), + Res; +encode({datetime, Val}, AsBinary) -> + encode(Val, AsBinary); +encode({{Year, Month, Day}, {Hour, Minute, Second}}, false) -> + Res = two_digits([Year, Month, Day, Hour, Minute, Second]), + lists:flatten(Res); +encode({TimeType, Val}, AsBinary) + when TimeType == 'date'; + TimeType == 'time' -> + encode(Val, AsBinary); +encode({Time1, Time2, Time3}, false) -> + Res = two_digits([Time1, Time2, Time3]), + lists:flatten(Res); +encode(Val, _AsBinary) -> + {error, {unrecognized_value, Val}}. + +two_digits(Nums) when is_list(Nums) -> + [two_digits(Num) || Num <- Nums]; +two_digits(Num) -> + [Str] = io_lib:format("~b", [Num]), + case length(Str) of + 1 -> [$0 | Str]; + _ -> Str + end. + +%% Quote a string or binary value so that it can be included safely in a +%% MySQL query. +quote(String) when is_list(String) -> + [39 | lists:reverse([39 | quote(String, [])])]; %% 39 is $' +quote(Bin) when is_binary(Bin) -> + list_to_binary(quote(binary_to_list(Bin))). + +quote([], Acc) -> + Acc; +quote([0 | Rest], Acc) -> + quote(Rest, [$0, $\\ | Acc]); +quote([10 | Rest], Acc) -> + quote(Rest, [$n, $\\ | Acc]); +quote([13 | Rest], Acc) -> + quote(Rest, [$r, $\\ | Acc]); +quote([$\\ | Rest], Acc) -> + quote(Rest, [$\\ , $\\ | Acc]); +quote([39 | Rest], Acc) -> %% 39 is $' + quote(Rest, [39, $\\ | Acc]); %% 39 is $' +quote([34 | Rest], Acc) -> %% 34 is $" + quote(Rest, [34, $\\ | Acc]); %% 34 is $" +quote([26 | Rest], Acc) -> + quote(Rest, [$Z, $\\ | Acc]); +quote([C | Rest], Acc) -> + quote(Rest, [C | Acc]). + diff --git a/plugins/emysql/src/emysql_app.erl b/plugins/emysql/src/emysql_app.erl new file mode 100644 index 000000000..4be2e70db --- /dev/null +++ b/plugins/emysql/src/emysql_app.erl @@ -0,0 +1,27 @@ +%%%---------------------------------------------------------------------- +%%% File : emysql_app.erl +%%% Author : Ery Lee +%%% Purpose : mysql driver application +%%% Created : 21 May 2009 +%%% Updated : 11 Jan 2010 +%%% License : http://www.opengoss.com +%%% +%%% Copyright (C) 2007-2010, www.opengoss.com +%%%---------------------------------------------------------------------- +-module(emysql_app). + +-author('ery.lee@gmail.com'). + +-behavior(application). + +-export([start/0, start/2, stop/1]). + +start() -> + application:start(emysql). + +start(normal, _Args) -> + emysql_sup:start_link(application:get_all_env()). + +stop(_) -> + ok. + diff --git a/plugins/emysql/src/emysql_auth.erl b/plugins/emysql/src/emysql_auth.erl new file mode 100644 index 000000000..72aebfd3b --- /dev/null +++ b/plugins/emysql/src/emysql_auth.erl @@ -0,0 +1,102 @@ +-module(emysql_auth). + +-export([make_auth/2, make_new_auth/3, password_old/2, password_new/2]). + +%%-------------------------------------------------------------------- +%% Macros +%%-------------------------------------------------------------------- +-define(LONG_PASSWORD, 1). +-define(LONG_FLAG, 4). +-define(PROTOCOL_41, 512). +-define(TRANSACTIONS, 8192). +-define(SECURE_CONNECTION, 32768). +-define(CONNECT_WITH_DB, 8). +-define(MAX_PACKET_SIZE, 1000000). + +password_old(Password, Salt) -> + {P1, P2} = hash(Password), + {S1, S2} = hash(Salt), + Seed1 = P1 bxor S1, + Seed2 = P2 bxor S2, + List = rnd(9, Seed1, Seed2), + {L, [Extra]} = lists:split(8, List), + list_to_binary(lists:map(fun (E) -> E bxor (Extra - 64) end, L)). + +%% part of do_old_auth/4, which is part of mysql_init/4 +make_auth(User, Password) -> + Caps = ?LONG_PASSWORD bor ?LONG_FLAG bor ?TRANSACTIONS, + Maxsize = 0, + UserB = list_to_binary(User), + PasswordB = Password, + <>. + +%% part of do_new_auth/4, which is part of mysql_init/4 +make_new_auth(User, Password, Database) -> + DBCaps = case Database of + none -> + 0; + _ -> + ?CONNECT_WITH_DB + end, + Caps = ?LONG_PASSWORD bor ?LONG_FLAG bor ?TRANSACTIONS bor + ?PROTOCOL_41 bor ?SECURE_CONNECTION bor DBCaps, + Maxsize = ?MAX_PACKET_SIZE, + UserB = list_to_binary(User), + PasswordL = size(Password), + DatabaseB = case Database of + none -> + <<>>; + _ -> + list_to_binary(Database) + end, + <>. + +hash(S) -> + hash(S, 1345345333, 305419889, 7). + +hash([C | S], N1, N2, Add) -> + N1_1 = N1 bxor (((N1 band 63) + Add) * C + N1 * 256), + N2_1 = N2 + ((N2 * 256) bxor N1_1), + Add_1 = Add + C, + hash(S, N1_1, N2_1, Add_1); +hash([], N1, N2, _Add) -> + Mask = (1 bsl 31) - 1, + {N1 band Mask , N2 band Mask}. + +rnd(N, Seed1, Seed2) -> + Mod = (1 bsl 30) - 1, + rnd(N, [], Seed1 rem Mod, Seed2 rem Mod). + +rnd(0, List, _, _) -> + lists:reverse(List); +rnd(N, List, Seed1, Seed2) -> + Mod = (1 bsl 30) - 1, + NSeed1 = (Seed1 * 3 + Seed2) rem Mod, + NSeed2 = (NSeed1 + Seed2 + 33) rem Mod, + Float = (float(NSeed1) / float(Mod))*31, + Val = trunc(Float)+64, + rnd(N - 1, [Val | List], NSeed1, NSeed2). + + +dualmap(_F, [], []) -> + []; +dualmap(F, [E1 | R1], [E2 | R2]) -> + [F(E1, E2) | dualmap(F, R1, R2)]. + +bxor_binary(B1, B2) -> + list_to_binary(dualmap(fun (E1, E2) -> + E1 bxor E2 + end, binary_to_list(B1), binary_to_list(B2))). + +password_new(Password, Salt) -> + Stage1 = crypto:sha(Password), + Stage2 = crypto:sha(Stage1), + Res = crypto:sha_final( + crypto:sha_update( + crypto:sha_update(crypto:sha_init(), Salt), + Stage2) + ), + bxor_binary(Res, Stage1). + diff --git a/plugins/emysql/src/emysql_conn.erl b/plugins/emysql/src/emysql_conn.erl new file mode 100644 index 000000000..124e23a9d --- /dev/null +++ b/plugins/emysql/src/emysql_conn.erl @@ -0,0 +1,739 @@ +%%% File : emysql_conn.erl +%%% Author : Ery Lee +%%% Purpose : connection of mysql driver +%%% Created : 11 Jan 2010 +%%% License : http://www.opengoss.com +%%% +%%% Copyright (C) 2012, www.opengoss.com +%%%---------------------------------------------------------------------- +-module(emysql_conn). + +-include("emysql.hrl"). + +-import(proplists, [get_value/2, get_value/3]). + +-behaviour(gen_server). + +%% External exports +-export([start_link/2, + info/1, + sqlquery/2, + sqlquery/3, + prepare/3, + execute/3, + execute/4, + unprepare/2]). + +%% Callback +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-record(state, { + id, + host, + port, + user, + password, + database, + encoding, + mysql_version, + recv_pid, + socket, + data}). + +%%-define(KEEPALIVE_QUERY, <<"SELECT 1;">>). + +-define(SECURE_CONNECTION, 32768). + +-define(MYSQL_QUERY_OP, 3). + +%CALL > CONNECT +-define(CALL_TIMEOUT, 301000). + +-define(CONNECT_TIMEOUT, 300000). + +-define(MYSQL_4_0, 40). %% Support for MySQL 4.0.x + +-define(MYSQL_4_1, 41). %% Support for MySQL 4.1.x et 5.0.x + +%%-------------------------------------------------------------------- +%% Function: start(Opts) +%% Descrip.: Starts a mysql_conn process that connects to a MySQL +%% server, logs in and chooses a database. +%% Returns : {ok, Pid} | {error, Reason} +%% Pid = pid() +%% Reason = string() +%%-------------------------------------------------------------------- +start_link(Id, Opts) -> + gen_server:start_link(?MODULE, [Id, Opts], []). + +info(Conn) -> + gen_server:call(Conn, info). + +%%-------------------------------------------------------------------- +%% Function: sqlquery(Query) +%% Queries = A single binary() query or a list of binary() queries. +%% If a list is provided, the return value is the return +%% of the last query, or the first query that has +%% returned an error. If an error occurs, execution of +%% the following queries is aborted. +%% From = pid() or term(), use a From of self() when +%% using this module for a single connection, +%% or pass the gen_server:call/3 From argument if +%% using a gen_server to do the querys (e.g. the +%% mysql_dispatcher) +%% Timeout = integer() | infinity, gen_server timeout value +%% Descrip.: Send a query or a list of queries and wait for the result +%% if running stand-alone (From = self()), but don't block +%% the caller if we are not running stand-alone +%% (From = gen_server From). +%% Returns : ok | (non-stand-alone mode) +%% {data, #mysql_result} | (stand-alone mode) +%% {updated, #mysql_result} | (stand-alone mode) +%% {error, #mysql_result} (stand-alone mode) +%% FieldInfo = term() +%% Rows = list() of [string()] +%% Reason = term() +%%-------------------------------------------------------------------- +sqlquery(Conn, Query) -> + sqlquery(Conn, Query, ?CALL_TIMEOUT). + +sqlquery(Conn, Query, Timeout) -> + call(Conn, {sqlquery, Query}, Timeout). + +prepare(Conn, Name, Stmt) -> + call(Conn, {prepare, Name, Stmt}). + +execute(Conn, Name, Params) -> + execute(Conn, Name, Params, ?CALL_TIMEOUT). + +execute(Conn, Name, Params, Timeout) -> + call(Conn, {execute, Name, Params}, Timeout). + +unprepare(Conn, Name) -> + call(Conn, {unprepare, Name}). + +%%-------------------------------------------------------------------- +%% Function: init(Host, Port, User, Password, Database, Parent) +%% Host = string() +%% Port = integer() +%% User = string() +%% Password = string() +%% Database = string() +%% Parent = pid() of process starting this mysql_conn +%% Descrip.: Connect to a MySQL server, log in and chooses a database. +%% Report result of this to Parent, and then enter loop() if +%% we were successfull. +%% Returns : void() | does not return +%%-------------------------------------------------------------------- +init([Id, Opts]) -> + put(queries, 0), + Host = get_value(host, Opts, "localhost"), + Port = get_value(port, Opts, 3306), + UserName = get_value(username, Opts, "root"), + Password = get_value(password, Opts, "public"), + Database = get_value(database, Opts), + Encoding = get_value(encoding, Opts, utf8), + case emysql_recv:start_link(Host, Port) of + {ok, RecvPid, Sock} -> + case mysql_init(Sock, RecvPid, UserName, Password) of + {ok, Version} -> + Db = iolist_to_binary(Database), + case do_query(Sock, RecvPid, <<"use ", Db/binary>>, Version) of + {error, #mysql_result{error = Error} = _MySQLRes} -> + error_logger:error_msg("emysql_conn: use '~p' error: ~p", [Database, Error]), + {stop, using_db_error}; + {_ResultType, _MySQLRes} -> + emysql:pool(Id), %pool it + pg2:create(emysql_conn), + pg2:join(emysql_conn, self()), + EncodingBinary = list_to_binary(atom_to_list(Encoding)), + do_query(Sock, RecvPid, <<"set names '", EncodingBinary/binary, "'">>, Version), + State = #state{ + id = Id, + host = Host, + port = Port, + user = UserName, + password = Password, + database = Database, + encoding = Encoding, + mysql_version = Version, + recv_pid = RecvPid, + socket = Sock, + data = <<>>}, + {ok, State} + end; + {error, Reason} -> + {stop, {login_failed, Reason}} + end; + {error, Reason} -> + {stop, Reason} + end. + +handle_call(info, _From, #state{id = Id} = State) -> + Reply = {Id, self(), get(queries)}, + {reply, Reply, State}; + +handle_call({sqlquery, Query}, _From, #state{socket = Socket, + recv_pid = RecvPid, mysql_version = Ver} = State) -> + put(queries, get(queries) + 1), + case do_query(Socket, RecvPid, Query, Ver) of + {error, mysql_timeout} = Err -> + {stop, mysql_timeout, Err, State}; + Res -> + {reply, Res, State} + end; + +handle_call({prepare, Name, Stmt}, _From, #state{socket = Socket, + recv_pid = RecvPid, mysql_version = Ver} = State) -> + + case do_prepare(Socket, RecvPid, Name, Stmt, Ver) of + {error, mysql_timeout} -> + {stop, mysql_timeout, State}; + _ -> + {reply, ok, State} + end; + +handle_call({unprepare, Name}, _From, #state{socket = Socket, + recv_pid = RecvPid, mysql_version = Ver} = State) -> + case do_unprepare(Socket, RecvPid, Name, Ver) of + {error, mysql_timeout} -> + {stop, mysql_timeout, State}; + _ -> + {reply, ok, State} + end; + +handle_call({execute, Name, Params}, _From, #state{socket = Socket, + recv_pid = RecvPid, mysql_version = Ver} = State) -> + case do_execute(Socket, RecvPid, Name, Params, Ver) of + {error, mysql_timeout} = Err -> + {stop, mysql_timeout, Err, State}; + Res -> + {reply, Res, State} + end; + +handle_call(Req, _From, State) -> + error_logger:error_msg("badreq to emysql_conn: ~p", [Req]), + {reply, {error, badreq}, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({mysql_recv, _RecvPid, data, _Packet, SeqNum}, State) -> + error_logger:error_msg("unexpected mysql_recv: seq_num = ~p", [SeqNum]), + {noreply, State}; + +handle_info({mysql_recv, _RecvPid, closed, E}, State) -> + error_logger:error_msg("mysql socket closed: ~p", [E]), + {stop, socket_closed, State}; + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +do_queries(Sock, RecvPid, Queries, Version) -> + catch + lists:foldl( + fun(Query, _LastResponse) -> + case do_query(Sock, RecvPid, Query, Version) of + {error, _} = Err -> throw(Err); + Res -> Res + end + end, ok, Queries). + +do_query(Sock, RecvPid, Query, Version) -> + Query1 = iolist_to_binary(Query), + %?DEBUG("sqlquery ~p (id ~p)", [Query1, RecvPid]), + Packet = <>, + case do_send(Sock, Packet, 0) of + ok -> + get_query_response(RecvPid, Version); + {error, Reason} -> + {error, Reason} + end. + +do_prepare(Socket, RecvPid, Name, Stmt, Ver) -> + NameBin = atom_to_binary(Name), + StmtBin = <<"PREPARE ", NameBin/binary, " FROM '", Stmt/binary, "'">>, + do_query(Socket, RecvPid, StmtBin, Ver). + +do_execute(Socket, RecvPid, Name, Params, Ver) -> + Stmts = make_statements(Name, Params), + do_queries(Socket, RecvPid, Stmts, Ver). + +do_unprepare(Socket, RecvPid, Name, Ver) -> + NameBin = atom_to_binary(Name), + StmtBin = <<"UNPREPARE ", NameBin/binary>>, + do_query(Socket, RecvPid, StmtBin, Ver). + +make_statements(Name, []) -> + NameBin = atom_to_binary(Name), + [<<"EXECUTE ", NameBin/binary>>]; + +make_statements(Name, Params) -> + NumParams = length(Params), + ParamNums = lists:seq(1, NumParams), + NameBin = atom_to_binary(Name), + ParamNames = + lists:foldl( + fun(Num, Acc) -> + ParamName = [$@ | integer_to_list(Num)], + if Num == 1 -> + ParamName ++ Acc; + true -> + [$, | ParamName] ++ Acc + end + end, [], lists:reverse(ParamNums)), + ParamNamesBin = list_to_binary(ParamNames), + ExecStmt = <<"EXECUTE ", NameBin/binary, " USING ", + ParamNamesBin/binary>>, + + ParamVals = lists:zip(ParamNums, Params), + Stmts = lists:foldl( + fun({Num, Val}, Acc) -> + NumBin = emysql:encode(Num, true), + ValBin = emysql:encode(Val, true), + [<<"SET @", NumBin/binary, "=", ValBin/binary>> | Acc] + end, [ExecStmt], lists:reverse(ParamVals)), + Stmts. + +atom_to_binary(Val) -> + <<_:4/binary, Bin/binary>> = term_to_binary(Val), + Bin. + +%%-------------------------------------------------------------------- +%% authentication +%%-------------------------------------------------------------------- +do_old_auth(Sock, RecvPid, SeqNum, User, Password, Salt1) -> + Auth = emysql_auth:password_old(Password, Salt1), + Packet = emysql_auth:make_auth(User, Auth), + do_send(Sock, Packet, SeqNum), + do_recv(RecvPid, SeqNum). + +do_new_auth(Sock, RecvPid, SeqNum, User, Password, Salt1, Salt2) -> + Auth = emysql_auth:password_new(Password, Salt1 ++ Salt2), + Packet2 = emysql_auth:make_new_auth(User, Auth, none), + do_send(Sock, Packet2, SeqNum), + case do_recv(RecvPid, SeqNum) of + {ok, Packet3, SeqNum2} -> + case Packet3 of + <<254:8>> -> + AuthOld = emysql_auth:password_old(Password, Salt1), + do_send(Sock, <>, SeqNum2 + 1), + do_recv(RecvPid, SeqNum2 + 1); + _ -> + {ok, Packet3, SeqNum2} + end; + {error, Reason} -> + {error, Reason} + end. + +%%-------------------------------------------------------------------- +%% Function: mysql_init(Sock, RecvPid, User, Password) +%% Sock = term(), gen_tcp socket +%% RecvPid = pid(), mysql_recv process +%% User = string() +%% Password = string() +%% LogFun = undefined | function() with arity 3 +%% Descrip.: Try to authenticate on our new socket. +%% Returns : ok | {error, Reason} +%% Reason = string() +%%-------------------------------------------------------------------- +mysql_init(Sock, RecvPid, User, Password) -> + case do_recv(RecvPid, undefined) of + {ok, Packet, InitSeqNum} -> + {Version, Salt1, Salt2, Caps} = greeting(Packet), + %?DEBUG("version: ~p, ~p, ~p, ~p", [Version, Salt1, Salt2, Caps]), + AuthRes = + case Caps band ?SECURE_CONNECTION of + ?SECURE_CONNECTION -> + do_new_auth(Sock, RecvPid, InitSeqNum + 1, User, Password, Salt1, Salt2); + _ -> + do_old_auth(Sock, RecvPid, InitSeqNum + 1, User, Password, Salt1) + end, + case AuthRes of + {ok, <<0:8, _Rest/binary>>, _RecvNum} -> + {ok,Version}; + {ok, <<255:8, _Code:16/little, Message/binary>>, _RecvNum} -> + {error, binary_to_list(Message)}; + {ok, RecvPacket, _RecvNum} -> + {error, binary_to_list(RecvPacket)}; + {error, Reason} -> + %?ERROR("init failed receiving data : ~p", [Reason]), + {error, Reason} + end; + {error, Reason} -> + {error, Reason} + end. + +greeting(Packet) -> + <<_Protocol:8, Rest/binary>> = Packet, + {Version, Rest2} = asciz(Rest), + <<_TreadID:32/little, Rest3/binary>> = Rest2, + {Salt, Rest4} = asciz(Rest3), + <> = Rest4, + <<_ServerChar:16/binary-unit:8, Rest6/binary>> = Rest5, + {Salt2, _Rest7} = asciz(Rest6), + %?DEBUG("greeting version ~p (protocol ~p) salt ~p caps ~p serverchar ~p" + %"salt2 ~p", + %[Version, Protocol, Salt, Caps, ServerChar, Salt2]), + {normalize_version(Version), Salt, Salt2, Caps}. + +%% part of greeting/2 +asciz(Data) when is_binary(Data) -> + asciz_binary(Data, []); +asciz(Data) when is_list(Data) -> + {String, [0 | Rest]} = lists:splitwith(fun (C) -> + C /= 0 + end, Data), + {String, Rest}. + +%% @doc Find the first zero-byte in Data and add everything before it +%% to Acc, as a string. +%% +%% @spec asciz_binary(Data::binary(), Acc::list()) -> +%% {NewList::list(), Rest::binary()} +asciz_binary(<<>>, Acc) -> + {lists:reverse(Acc), <<>>}; +asciz_binary(<<0:8, Rest/binary>>, Acc) -> + {lists:reverse(Acc), Rest}; +asciz_binary(<>, Acc) -> + asciz_binary(Rest, [C | Acc]). + +%%-------------------------------------------------------------------- +%% Function: get_query_response(RecvPid) +%% RecvPid = pid(), mysql_recv process +%% Version = integer(), Representing MySQL version used +%% Descrip.: Wait for frames until we have a complete query response. +%% Returns : {data, #mysql_result} +%% {updated, #mysql_result} +%% {error, #mysql_result} +%% FieldInfo = list() of term() +%% Rows = list() of [string()] +%% AffectedRows = int() +%% Reason = term() +%%-------------------------------------------------------------------- +get_query_response(RecvPid, Version) -> + case do_recv(RecvPid, undefined) of + {ok, <>, _} -> + case Fieldcount of + 0 -> + %% No Tabular data + {AffectedRows, Rest1} = decode_length_binary(Rest), + {InsertId, _} = decode_length_binary(Rest1), + {updated, #mysql_result{insert_id = InsertId, affectedrows=AffectedRows}}; + 255 -> + <<_Code:16/little, Message/binary>> = Rest, + {error, #mysql_result{error=Message}}; + _ -> + %% Tabular data received + case get_fields(RecvPid, [], Version) of + {ok, Fields} -> + case get_rows(Fields, RecvPid, []) of + {ok, Rows} -> + {data, #mysql_result{fieldinfo=Fields, + rows=Rows}}; + {error, Reason} -> + {error, Reason} + end; + {error, Reason} -> + {error, Reason} + end + end; + {error, Reason} -> + {error, Reason} + end. + +decode_length_binary(<>) -> + if + Len =< 251 -> + {Len, Rest}; + Len == 252 -> %two bytes + <> = Rest, + {Val, Rest1}; + Len == 253 -> %three + <> = Rest, + {Val, Rest1}; + Len == 254 -> %eight + <> = Rest, + {Val, Rest1}; + true -> + %?ERROR("affectedrows: ~p", [Len]), + {0, Rest} + end. + +%%-------------------------------------------------------------------- +%% Function: do_recv(RecvPid, SeqNum) +%% RecvPid = pid(), mysql_recv process +%% SeqNum = undefined | integer() +%% Descrip.: Wait for a frame decoded and sent to us by RecvPid. +%% Either wait for a specific frame if SeqNum is an integer, +%% or just any frame if SeqNum is undefined. +%% Returns : {ok, Packet, Num} | +%% {error, Reason} +%% Reason = term() +%% +%% Note : Only to be used externally by the 'mysql_auth' module. +%%-------------------------------------------------------------------- +do_recv(RecvPid, SeqNum) when SeqNum == undefined -> + receive + {mysql_recv, RecvPid, data, Packet, Num} -> + {ok, Packet, Num}; + {mysql_recv, RecvPid, closed, _E} -> + {error, socket_closed} + after ?CONNECT_TIMEOUT -> + {error, mysql_timeout} + end; + +do_recv(RecvPid, SeqNum) when is_integer(SeqNum) -> + ResponseNum = SeqNum + 1, + receive + {mysql_recv, RecvPid, data, Packet, ResponseNum} -> + {ok, Packet, ResponseNum}; + {mysql_recv, RecvPid, closed, _E} -> + {error, socket_closed} + after ?CONNECT_TIMEOUT -> + {error, mysql_timeout} + end. + +call(Conn, Req) -> + gen_server:call(Conn, Req). + +call(Conn, Req, Timeout) -> + gen_server:call(Conn, Req, Timeout). + +%%-------------------------------------------------------------------- +%% Function: get_fields(RecvPid, [], Version) +%% RecvPid = pid(), mysql_recv process +%% Version = integer(), Representing MySQL version used +%% Descrip.: Received and decode field information. +%% Returns : {ok, FieldInfo} | +%% {error, Reason} +%% FieldInfo = list() of term() +%% Reason = term() +%%-------------------------------------------------------------------- +%% Support for MySQL 4.0.x: +get_fields(RecvPid, Res, ?MYSQL_4_0) -> + case do_recv(RecvPid, undefined) of + {ok, Packet, _Num} -> + case Packet of + <<254:8>> -> + {ok, lists:reverse(Res)}; + <<254:8, Rest/binary>> when size(Rest) < 8 -> + {ok, lists:reverse(Res)}; + _ -> + {Table, Rest} = get_with_length(Packet), + {Field, Rest2} = get_with_length(Rest), + {LengthB, Rest3} = get_with_length(Rest2), + LengthL = size(LengthB) * 8, + <> = LengthB, + {Type, Rest4} = get_with_length(Rest3), + {_Flags, _Rest5} = get_with_length(Rest4), + This = {Table, + Field, + Length, + %% TODO: Check on MySQL 4.0 if types are specified + %% using the same 4.1 formalism and could + %% be expanded to atoms: + Type}, + get_fields(RecvPid, [This | Res], ?MYSQL_4_0) + end; + {error, Reason} -> + {error, Reason} + end; +%% Support for MySQL 4.1.x and 5.x: +get_fields(RecvPid, Res, ?MYSQL_4_1) -> + case do_recv(RecvPid, undefined) of + {ok, Packet, _Num} -> + case Packet of + <<254:8>> -> + {ok, lists:reverse(Res)}; + <<254:8, Rest/binary>> when size(Rest) < 8 -> + {ok, lists:reverse(Res)}; + _ -> + {_Catalog, Rest} = get_with_length(Packet), + {_Database, Rest2} = get_with_length(Rest), + {Table, Rest3} = get_with_length(Rest2), + %% OrgTable is the real table name if Table is an alias + {_OrgTable, Rest4} = get_with_length(Rest3), + {Field, Rest5} = get_with_length(Rest4), + %% OrgField is the real field name if Field is an alias + {_OrgField, Rest6} = get_with_length(Rest5), + + <<_Metadata:8/little, _Charset:16/little, + Length:32/little, Type:8/little, + _Flags:16/little, _Decimals:8/little, + _Rest7/binary>> = Rest6, + + This = {Table, + Field, + Length, + get_field_datatype(Type)}, + get_fields(RecvPid, [This | Res], ?MYSQL_4_1) + end; + {error, Reason} -> + {error, Reason} + end. + +%%-------------------------------------------------------------------- +%% Function: get_rows(N, RecvPid, []) +%% N = integer(), number of rows to get +%% RecvPid = pid(), mysql_recv process +%% Descrip.: Receive and decode a number of rows. +%% Returns : {ok, Rows} | +%% {error, Reason} +%% Rows = list() of [string()] +%%-------------------------------------------------------------------- +get_rows(Fields, RecvPid, Res) -> + case do_recv(RecvPid, undefined) of + {ok, Packet, _Num} -> + case Packet of + <<254:8, Rest/binary>> when size(Rest) < 8 -> + {ok, lists:reverse(Res)}; + _ -> + {ok, This} = get_row(Fields, Packet, []), + get_rows(Fields, RecvPid, [This | Res]) + end; + {error, Reason} -> + {error, Reason} + end. + +%% part of get_rows/4 +get_row([], _Data, Res) -> + {ok, lists:reverse(Res)}; +get_row([Field | OtherFields], Data, Res) -> + {Col, Rest} = get_with_length(Data), + This = case Col of + null -> + undefined; + _ -> + convert_type(Col, element(4, Field)) + end, + get_row(OtherFields, Rest, [This | Res]). + +get_with_length(<<251:8, Rest/binary>>) -> + {null, Rest}; +get_with_length(<<252:8, Length:16/little, Rest/binary>>) -> + split_binary(Rest, Length); +get_with_length(<<253:8, Length:24/little, Rest/binary>>) -> + split_binary(Rest, Length); +get_with_length(<<254:8, Length:64/little, Rest/binary>>) -> + split_binary(Rest, Length); +get_with_length(<>) when Length < 251 -> + split_binary(Rest, Length). + + +%%-------------------------------------------------------------------- +%% Function: do_send(Sock, Packet, SeqNum) +%% Sock = term(), gen_tcp socket +%% Packet = binary() +%% SeqNum = integer(), packet sequence number +%% Descrip.: Send a packet to the MySQL server. +%% Returns : result of gen_tcp:send/2 +%%-------------------------------------------------------------------- +do_send(Sock, Packet, SeqNum) when is_binary(Packet), is_integer(SeqNum) -> + Data = <<(size(Packet)):24/little, SeqNum:8, Packet/binary>>, + gen_tcp:send(Sock, Data). + +%%-------------------------------------------------------------------- +%% Function: normalize_version(Version) +%% Version = string() +%% Descrip.: Return a flag corresponding to the MySQL version used. +%% The protocol used depends on this flag. +%% Returns : Version = string() +%%-------------------------------------------------------------------- +normalize_version([$4,$.,$0|_T]) -> + %?DEBUG("switching to MySQL 4.0.x protocol.", []), + ?MYSQL_4_0; +normalize_version([$4,$.,$1|_T]) -> + ?MYSQL_4_1; +normalize_version([$5|_T]) -> + %% MySQL version 5.x protocol is compliant with MySQL 4.1.x: + ?MYSQL_4_1; +normalize_version([$6|_T]) -> + %% MySQL version 6.x protocol is compliant with MySQL 4.1.x: + ?MYSQL_4_1; +normalize_version(_Other) -> + %?ERROR("MySQL version '~p' not supported: MySQL Erlang module " + % "might not work correctly.", [Other]), + %% Error, but trying the oldest protocol anyway: + ?MYSQL_4_0. + +%%-------------------------------------------------------------------- +%% Function: get_field_datatype(DataType) +%% DataType = integer(), MySQL datatype +%% Descrip.: Return MySQL field datatype as description string +%% Returns : String, MySQL datatype +%%-------------------------------------------------------------------- +get_field_datatype(0) -> 'DECIMAL'; +get_field_datatype(1) -> 'TINY'; +get_field_datatype(2) -> 'SHORT'; +get_field_datatype(3) -> 'LONG'; +get_field_datatype(4) -> 'FLOAT'; +get_field_datatype(5) -> 'DOUBLE'; +get_field_datatype(6) -> 'NULL'; +get_field_datatype(7) -> 'TIMESTAMP'; +get_field_datatype(8) -> 'LONGLONG'; +get_field_datatype(9) -> 'INT24'; +get_field_datatype(10) -> 'DATE'; +get_field_datatype(11) -> 'TIME'; +get_field_datatype(12) -> 'DATETIME'; +get_field_datatype(13) -> 'YEAR'; +get_field_datatype(14) -> 'NEWDATE'; +get_field_datatype(246) -> 'NEWDECIMAL'; +get_field_datatype(247) -> 'ENUM'; +get_field_datatype(248) -> 'SET'; +get_field_datatype(249) -> 'TINYBLOB'; +get_field_datatype(250) -> 'MEDIUM_BLOG'; +get_field_datatype(251) -> 'LONG_BLOG'; +get_field_datatype(252) -> 'BLOB'; +get_field_datatype(253) -> 'VAR_STRING'; +get_field_datatype(254) -> 'STRING'; +get_field_datatype(255) -> 'GEOMETRY'. + +convert_type(Val, ColType) -> + case ColType of + T when T == 'TINY'; + T == 'SHORT'; + T == 'LONG'; + T == 'LONGLONG'; + T == 'INT24'; + T == 'YEAR' -> + list_to_integer(binary_to_list(Val)); + T when T == 'TIMESTAMP'; + T == 'DATETIME' -> + {ok, [Year, Month, Day, Hour, Minute, Second], _Leftovers} = + io_lib:fread("~d-~d-~d ~d:~d:~d", binary_to_list(Val)), + {datetime, {{Year, Month, Day}, {Hour, Minute, Second}}}; + 'TIME' -> + {ok, [Hour, Minute, Second], _Leftovers} = + io_lib:fread("~d:~d:~d", binary_to_list(Val)), + {time, {Hour, Minute, Second}}; + 'DATE' -> + {ok, [Year, Month, Day], _Leftovers} = + io_lib:fread("~d-~d-~d", binary_to_list(Val)), + {date, {Year, Month, Day}}; + T when T == 'DECIMAL'; + T == 'NEWDECIMAL'; + T == 'FLOAT'; + T == 'DOUBLE' -> + {ok, [Num], _Leftovers} = + case io_lib:fread("~f", binary_to_list(Val)) of + {error, _} -> + io_lib:fread("~d", binary_to_list(Val)); + Res -> + Res + end, + Num; + _Other -> + Val + end. diff --git a/plugins/emysql/src/emysql_recv.erl b/plugins/emysql/src/emysql_recv.erl new file mode 100644 index 000000000..5577c2e9a --- /dev/null +++ b/plugins/emysql/src/emysql_recv.erl @@ -0,0 +1,130 @@ +%%%------------------------------------------------------------------- +%%% File : emysql_recv.erl +%%% Author : Fredrik Thulin +%%% Descrip.: Handles data being received on a MySQL socket. Decodes +%%% per-row framing and sends each row to parent. +%%% +%%% Created : 4 Aug 2005 by Fredrik Thulin +%%% +%%% Note : All MySQL code was written by Magnus Ahltorp, originally +%%% in the file mysql.erl - I just moved it here. +%%% +%%% Copyright (c) 2001-2004 Kungliga Tekniska +%%% See the file COPYING +%%% +%%% Signals this receiver process can send to it's parent +%%% (the parent is a mysql_conn connection handler) : +%%% +%%% {mysql_recv, self(), data, Packet, Num} +%%% {mysql_recv, self(), closed, {error, Reason}} +%%% {mysql_recv, self(), closed, normal} +%%% +%%% Internally (from inside init/4 to start_link/4) the +%%% following signals may be sent to the parent process : +%%% +%%% {mysql_recv, self(), init, {ok, Sock}} +%%% {mysql_recv, self(), init, {error, E}} +%%% +%%%------------------------------------------------------------------- +-module(emysql_recv). + +%%-------------------------------------------------------------------- +%% External exports (should only be used by the 'mysql_conn' module) +%%-------------------------------------------------------------------- +-export([start_link/2]). + +%callback +-export([init/3]). + +-record(state, { + socket, + parent, + log_fun, + data}). + +-define(SECURE_CONNECTION, 32768). + +-define(CONNECT_TIMEOUT, 10000). + +%%-------------------------------------------------------------------- +%% Function: start_link(Host, Port, Parent) +%% Host = string() +%% Port = integer() +%% Parent = pid(), process that should get received frames +%% Descrip.: Start a process that connects to Host:Port and waits for +%% data. When it has received a MySQL frame, it sends it to +%% Parent and waits for the next frame. +%% Returns : {ok, RecvPid, Socket} | +%% {error, Reason} +%% RecvPid = pid(), receiver process pid +%% Socket = term(), gen_tcp socket +%% Reason = atom() | string() +%%-------------------------------------------------------------------- +start_link(Host, Port) -> + proc_lib:start_link(?MODULE, init, [self(), Host, Port]). + +%%-------------------------------------------------------------------- +%% Function: init((Host, Port, Parent) +%% Host = string() +%% Port = integer() +%% Parent = pid(), process that should get received frames +%% Descrip.: Connect to Host:Port and then enter receive-loop. +%% Returns : error | never returns +%%-------------------------------------------------------------------- +init(Parent, Host, Port) -> + case gen_tcp:connect(Host, Port, [binary, {packet, 0}]) of + {ok, Sock} -> + proc_lib:init_ack(Parent, {ok, self(), Sock}), + loop(#state{socket = Sock, parent = Parent, data = <<>>}); + {error, Reason} -> + proc_lib:init_ack(Parent, {error, Reason}) + end. + +%%-------------------------------------------------------------------- +%% Function: loop(State) +%% State = state record() +%% Descrip.: The main loop. Wait for data from our TCP socket and act +%% on received data or signals that our socket was closed. +%% Returns : error | never returns +%%-------------------------------------------------------------------- +loop(State) -> + Sock = State#state.socket, + receive + {tcp, Sock, InData} -> + NewData = list_to_binary([State#state.data, InData]), + %% send data to parent if we have enough data + Rest = sendpacket(State#state.parent, NewData), + loop(State#state{data = Rest}); + {tcp_error, Sock, Reason} -> + State#state.parent ! {mysql_recv, self(), closed, {error, Reason}}, + error; + {tcp_closed, Sock} -> + State#state.parent ! {mysql_recv, self(), closed, normal}, + error; + _Other -> %maybe system message + loop(State) + end. + +%%-------------------------------------------------------------------- +%% Function: sendpacket(Parent, Data) +%% Parent = pid() +%% Data = binary() +%% Descrip.: Check if we have received one or more complete frames by +%% now, and if so - send them to Parent. +%% Returns : Rest = binary() +%%-------------------------------------------------------------------- +%% send data to parent if we have enough data +sendpacket(Parent, Data) -> + case Data of + <> -> + if + Length =< size(D) -> + {Packet, Rest} = split_binary(D, Length), + Parent ! {mysql_recv, self(), data, Packet, Num}, + sendpacket(Parent, Rest); + true -> + Data + end; + _ -> + Data + end. diff --git a/plugins/emysql/src/emysql_sup.erl b/plugins/emysql/src/emysql_sup.erl new file mode 100644 index 000000000..fde735c61 --- /dev/null +++ b/plugins/emysql/src/emysql_sup.erl @@ -0,0 +1,33 @@ +%%%---------------------------------------------------------------------- +%%% File : emysql_sup.erl +%%% Author : Ery Lee +%%% Purpose : Mysql driver supervisor +%%% Created : 21 May 2009 +%%% Updated : 11 Jan 2010 +%%% License : http://www.opengoss.com +%%% +%%% Copyright (C) 2012, www.opengoss.com +%%%---------------------------------------------------------------------- +-module(emysql_sup). + +-author('ery.lee@gmail.com'). + +-behavior(supervisor). + +%% API +-export([start_link/1, init/1]). + +start_link(Opts) -> + supervisor:start_link({local, ?MODULE}, ?MODULE, Opts). + +init(Opts) -> + PoolSize = proplists:get_value(pool_size, Opts, + erlang:system_info(schedulers)), + {ok, {{one_for_one, 10, 10}, + [{emysql, {emysql, start_link, [PoolSize]}, transient, + 16#ffffffff, worker, [emysql]} | + [{I, {emysql_conn, start_link, [I, Opts]}, transient, 16#ffffffff, + worker, [emysql_conn, emysql_recv]} || I <- lists:seq(1, PoolSize)]] + } + }. + diff --git a/rel/files/plugins.config b/rel/files/plugins.config index 1e8f9ca2b..737542b6b 100644 --- a/rel/files/plugins.config +++ b/rel/files/plugins.config @@ -1,6 +1,15 @@ [ - {emqttd_plugin_demo, [ - {config, value} + {emysql, [ + {pool_size, 4}, + {host, "localhost"}, + {port, 3306}, + {username, "root"}, + {password, "public"}, + {database, "mqtt"}, + {encoding, utf8} + ]}, + {emqttd_auth_mysql, [ + {user_table, mqtt_users} ]} %% % {emqttd_dashboard, [