diff --git a/apps/emqx_authn/etc/emqx_authn.conf b/apps/emqx_authn/etc/emqx_authn.conf index 7c1bc711c..8af40fe8f 100644 --- a/apps/emqx_authn/etc/emqx_authn.conf +++ b/apps/emqx_authn/etc/emqx_authn.conf @@ -6,6 +6,21 @@ emqx_authn: { # mechanism: password-based # server_type: built-in-database # user_id_type: clientid + # }, + # { + # name: "authenticator2" + # mechanism: password-based + # server_type: mongodb + # server: "127.0.0.1:27017" + # database: mqtt + # collection: users + # selector: { + # username: "${mqtt-username}" + # } + # password_hash_field: password_hash + # salt_field: salt + # password_hash_algorithm: sha256 + # salt_position: prefix # } ] } diff --git a/apps/emqx_authn/include/emqx_authn.hrl b/apps/emqx_authn/include/emqx_authn.hrl index 5a8bfb66a..f9ba7c3b5 100644 --- a/apps/emqx_authn/include/emqx_authn.hrl +++ b/apps/emqx_authn/include/emqx_authn.hrl @@ -20,6 +20,8 @@ -define(VER_1, <<"1">>). -define(VER_2, <<"2">>). +-define(RE_PLACEHOLDER, "\\$\\{[a-z0-9\\-]+\\}"). + -record(authenticator, { id :: binary() , name :: binary() @@ -35,25 +37,3 @@ }). -define(AUTH_SHARD, emqx_authn_shard). - --define(CLUSTER_CALL(Module, Func, Args), ?CLUSTER_CALL(Module, Func, Args, ok)). - --define(CLUSTER_CALL(Module, Func, Args, ResParttern), - fun() -> - case LocalResult = erlang:apply(Module, Func, Args) of - ResParttern -> - Nodes = nodes(), - {ResL, BadNodes} = rpc:multicall(Nodes, Module, Func, Args, 5000), - NResL = lists:zip(Nodes - BadNodes, ResL), - Errors = lists:filter(fun({_, ResParttern}) -> false; - (_) -> true - end, NResL), - OtherErrors = [{BadNode, node_does_not_exist} || BadNode <- BadNodes], - case Errors ++ OtherErrors of - [] -> LocalResult; - NErrors -> {error, NErrors} - end; - ErrorResult -> - {error, ErrorResult} - end - end()). diff --git a/apps/emqx_authn/src/emqx_authn.erl b/apps/emqx_authn/src/emqx_authn.erl index f41921e04..27f3eab10 100644 --- a/apps/emqx_authn/src/emqx_authn.erl +++ b/apps/emqx_authn/src/emqx_authn.erl @@ -314,6 +314,8 @@ authenticator_provider(#{mechanism := 'password-based', server_type := 'mysql'}) emqx_authn_mysql; authenticator_provider(#{mechanism := 'password-based', server_type := 'pgsql'}) -> emqx_authn_pgsql; +authenticator_provider(#{mechanism := 'password-based', server_type := 'mongodb'}) -> + emqx_authn_mongodb; authenticator_provider(#{mechanism := 'password-based', server_type := 'http-server'}) -> emqx_authn_http; authenticator_provider(#{mechanism := jwt}) -> diff --git a/apps/emqx_authn/src/emqx_authn_api.erl b/apps/emqx_authn/src/emqx_authn_api.erl index 1c4d0575d..9ee8989e4 100644 --- a/apps/emqx_authn/src/emqx_authn_api.erl +++ b/apps/emqx_authn/src/emqx_authn_api.erl @@ -775,7 +775,11 @@ definitions() -> default => true }, ssl => minirest:ref(<<"ssl">>), - password_hash_algorithm => minirest:ref(<<"password_hash_algorithm">>), + password_hash_algorithm => #{ + type => string, + enum => [<<"plain">>, <<"md5">>, <<"sha">>, <<"sha256">>, <<"sha512">>, <<"bcrypt">>], + default => <<"sha256">> + }, salt_position => #{ type => string, enum => [<<"prefix">>, <<"suffix">>], @@ -822,7 +826,11 @@ definitions() -> type => boolean, default => true }, - password_hash_algorithm => minirest:ref(<<"password_hash_algorithm">>), + password_hash_algorithm => #{ + type => string, + enum => [<<"plain">>, <<"md5">>, <<"sha">>, <<"sha256">>, <<"sha512">>, <<"bcrypt">>], + default => <<"sha256">> + }, salt_position => #{ type => string, enum => [<<"prefix">>, <<"suffix">>], diff --git a/apps/emqx_authn/src/emqx_authn_schema.erl b/apps/emqx_authn/src/emqx_authn_schema.erl index 030867ed7..694d5c7ca 100644 --- a/apps/emqx_authn/src/emqx_authn_schema.erl +++ b/apps/emqx_authn/src/emqx_authn_schema.erl @@ -47,6 +47,9 @@ authenticators(type) -> hoconsc:array({union, [ hoconsc:ref(emqx_authn_mnesia, config) , hoconsc:ref(emqx_authn_mysql, config) , hoconsc:ref(emqx_authn_pgsql, config) + , hoconsc:ref(emqx_authn_mongodb, standalone) + , hoconsc:ref(emqx_authn_mongodb, 'replica-set') + , hoconsc:ref(emqx_authn_mongodb, sharded) , hoconsc:ref(emqx_authn_http, get) , hoconsc:ref(emqx_authn_http, post) , hoconsc:ref(emqx_authn_jwt, 'hmac-based') diff --git a/apps/emqx_authn/src/emqx_authn_utils.erl b/apps/emqx_authn/src/emqx_authn_utils.erl index 2a91584f0..c035278cc 100644 --- a/apps/emqx_authn/src/emqx_authn_utils.erl +++ b/apps/emqx_authn/src/emqx_authn_utils.erl @@ -16,36 +16,53 @@ -module(emqx_authn_utils). --export([ replace_placeholder/2 +-export([ replace_placeholders/2 + , replace_placeholder/2 , gen_salt/0 + , bin/1 ]). %%------------------------------------------------------------------------------ %% APIs %%------------------------------------------------------------------------------ -replace_placeholder(PlaceHolders, Data) -> - replace_placeholder(PlaceHolders, Data, []). +replace_placeholders(PlaceHolders, Data) -> + replace_placeholders(PlaceHolders, Data, []). -replace_placeholder([], _Data, Acc) -> +replace_placeholders([], _Credential, Acc) -> lists:reverse(Acc); -replace_placeholder([<<"${mqtt-username}">> | More], #{username := Username} = Data, Acc) -> - replace_placeholder(More, Data, [convert_to_sql_param(Username) | Acc]); -replace_placeholder([<<"${mqtt-clientid}">> | More], #{clientid := ClientID} = Data, Acc) -> - replace_placeholder(More, Data, [convert_to_sql_param(ClientID) | Acc]); -replace_placeholder([<<"${ip-address}">> | More], #{peerhost := IPAddress} = Data, Acc) -> - replace_placeholder(More, Data, [convert_to_sql_param(IPAddress) | Acc]); -replace_placeholder([<<"${cert-subject}">> | More], #{dn := Subject} = Data, Acc) -> - replace_placeholder(More, Data, [convert_to_sql_param(Subject) | Acc]); -replace_placeholder([<<"${cert-common-name}">> | More], #{cn := CommonName} = Data, Acc) -> - replace_placeholder(More, Data, [convert_to_sql_param(CommonName) | Acc]); -replace_placeholder([_ | More], Data, Acc) -> - replace_placeholder(More, Data, [null | Acc]). +replace_placeholders([Placeholder | More], Credential, Acc) -> + case replace_placeholder(Placeholder, Credential) of + undefined -> + error({cannot_get_variable, Placeholder}); + V -> + replace_placeholders(More, Credential, [convert_to_sql_param(V) | Acc]) + end. + +replace_placeholder(<<"${mqtt-username}">>, Credential) -> + maps:get(username, Credential, undefined); +replace_placeholder(<<"${mqtt-clientid}">>, Credential) -> + maps:get(clientid, Credential, undefined); +replace_placeholder(<<"${mqtt-password}">>, Credential) -> + maps:get(password, Credential, undefined); +replace_placeholder(<<"${ip-address}">>, Credential) -> + maps:get(peerhost, Credential, undefined); +replace_placeholder(<<"${cert-subject}">>, Credential) -> + maps:get(dn, Credential, undefined); +replace_placeholder(<<"${cert-common-name}">>, Credential) -> + maps:get(cn, Credential, undefined); +replace_placeholder(Constant, _) -> + Constant. + gen_salt() -> <> = crypto:strong_rand_bytes(16), iolist_to_binary(io_lib:format("~32.16.0b", [X])). +bin(A) when is_atom(A) -> atom_to_binary(A, utf8); +bin(L) when is_list(L) -> list_to_binary(L); +bin(X) -> X. + %%------------------------------------------------------------------------------ %% Internal functions %%------------------------------------------------------------------------------ @@ -53,8 +70,4 @@ gen_salt() -> convert_to_sql_param(undefined) -> null; convert_to_sql_param(V) -> - bin(V). - -bin(A) when is_atom(A) -> atom_to_binary(A, utf8); -bin(L) when is_list(L) -> list_to_binary(L); -bin(X) -> X. + bin(V). \ No newline at end of file diff --git a/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl b/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl index 52c301946..56629c568 100644 --- a/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl +++ b/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl @@ -132,7 +132,6 @@ destroy(#{user_group := UserGroup}) -> end, mnesia:select(?TAB, MatchSpec, write)) end). -%% TODO: binary to atom add_user(#{user_id := UserID, password := Password}, #{user_group := UserGroup} = State) -> trans( diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl index 964e78a02..aa10a3b98 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl @@ -17,6 +17,7 @@ -module(emqx_authn_http). -include("emqx_authn.hrl"). +-include_lib("emqx/include/logger.hrl"). -include_lib("typerefl/include/types.hrl"). -behaviour(hocon_schema). @@ -122,15 +123,16 @@ create(#{ method := Method , headers => normalize_headers(Headers) , form_data => maps:to_list(FormData) , request_timeout => RequestTimeout + , '_unique' => Unique }, case emqx_resource:create_local(Unique, emqx_connector_http, Config#{base_url => maps:remove(query, URIMap), pool_type => random}) of {ok, _} -> - {ok, State#{resource_id => Unique}}; + {ok, State}; {error, already_created} -> - {ok, State#{resource_id => Unique}}; + {ok, State}; {error, Reason} -> {error, Reason} end. @@ -146,27 +148,33 @@ update(Config, State) -> authenticate(#{auth_method := _}, _) -> ignore; -authenticate(Credential, #{resource_id := ResourceID, +authenticate(Credential, #{'_unique' := Unique, method := Method, request_timeout := RequestTimeout} = State) -> - Request = generate_request(Credential, State), - case emqx_resource:query(ResourceID, {Method, Request, RequestTimeout}) of - {ok, 204, _Headers} -> ok; - {ok, 200, Headers, Body} -> - ContentType = proplists:get_value(<<"content-type">>, Headers, <<"application/json">>), - case safely_parse_body(ContentType, Body) of - {ok, _NBody} -> - %% TODO: Return by user property - ok; - {error, _Reason} -> - ok - end; - {error, _Reason} -> + try + Request = generate_request(Credential, State), + case emqx_resource:query(Unique, {Method, Request, RequestTimeout}) of + {ok, 204, _Headers} -> ok; + {ok, 200, Headers, Body} -> + ContentType = proplists:get_value(<<"content-type">>, Headers, <<"application/json">>), + case safely_parse_body(ContentType, Body) of + {ok, _NBody} -> + %% TODO: Return by user property + ok; + {error, _Reason} -> + ok + end; + {error, _Reason} -> + ignore + end + catch + error:Reason -> + ?LOG(warning, "The following error occurred in '~s' during authentication: ~p", [Unique, Reason]), ignore end. -destroy(#{resource_id := ResourceID}) -> - _ = emqx_resource:remove_local(ResourceID), +destroy(#{'_unique' := Unique}) -> + _ = emqx_resource:remove_local(Unique), ok. %%-------------------------------------------------------------------- @@ -242,31 +250,18 @@ generate_request(Credential, #{method := Method, {NPath, Headers, Body} end. -replace_placeholders(FormData0, Credential) -> - FormData = lists:map(fun({K, V0}) -> - case replace_placeholder(V0, Credential) of - undefined -> {K, undefined}; - V -> {K, bin(V)} - end - end, FormData0), - lists:filter(fun({_, V}) -> - V =/= undefined - end, FormData). +replace_placeholders(KVs, Credential) -> + replace_placeholders(KVs, Credential, []). -replace_placeholder(<<"${mqtt-username}">>, Credential) -> - maps:get(username, Credential, undefined); -replace_placeholder(<<"${mqtt-clientid}">>, Credential) -> - maps:get(clientid, Credential, undefined); -replace_placeholder(<<"${mqtt-password}">>, Credential) -> - maps:get(password, Credential, undefined); -replace_placeholder(<<"${ip-address}">>, Credential) -> - maps:get(peerhost, Credential, undefined); -replace_placeholder(<<"${cert-subject}">>, Credential) -> - maps:get(dn, Credential, undefined); -replace_placeholder(<<"${cert-common-name}">>, Credential) -> - maps:get(cn, Credential, undefined); -replace_placeholder(Constant, _) -> - Constant. +replace_placeholders([], _Credential, Acc) -> + lists:reverse(Acc); +replace_placeholders([{K, V0} | More], Credential, Acc) -> + case emqx_authn_utils:replace_placeholder(V0, Credential) of + undefined -> + error({cannot_get_variable, V0}); + V -> + replace_placeholders(More, Credential, [{K, emqx_authn_utils:bin(V)} | Acc]) + end. append_query(Path, []) -> Path; @@ -300,8 +295,4 @@ parse_body(<<"application/json">>, Body) -> parse_body(<<"application/x-www-form-urlencoded">>, Body) -> {ok, cow_qs:parse_qs(Body)}; parse_body(ContentType, _) -> - {error, {unsupported_content_type, ContentType}}. - -bin(A) when is_atom(A) -> atom_to_binary(A, utf8); -bin(L) when is_list(L) -> list_to_binary(L); -bin(X) -> X. \ No newline at end of file + {error, {unsupported_content_type, ContentType}}. \ No newline at end of file diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl new file mode 100644 index 000000000..4b9fab2be --- /dev/null +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl @@ -0,0 +1,227 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_authn_mongodb). + +-include("emqx_authn.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("typerefl/include/types.hrl"). + +-behaviour(hocon_schema). + +-export([ structs/0 + , fields/1 + ]). + +-export([ create/1 + , update/2 + , authenticate/2 + , destroy/1 + ]). + +%%------------------------------------------------------------------------------ +%% Hocon Schema +%%------------------------------------------------------------------------------ + +structs() -> [""]. + +fields("") -> + [ {config, {union, [ hoconsc:t(standalone) + , hoconsc:t('replica-set') + , hoconsc:t(sharded) + ]}} + ]; + +fields(standalone) -> + common_fields() ++ emqx_connector_mongo:fields(single); + +fields('replica-set') -> + common_fields() ++ emqx_connector_mongo:fields(rs); + +fields(sharded) -> + common_fields() ++ emqx_connector_mongo:fields(sharded). + +common_fields() -> + [ {name, fun emqx_authn_schema:authenticator_name/1} + , {mechanism, {enum, ['password-based']}} + , {server_type, {enum, [mongodb]}} + , {collection, fun collection/1} + , {selector, fun selector/1} + , {password_hash_field, fun password_hash_field/1} + , {salt_field, fun salt_field/1} + , {password_hash_algorithm, fun password_hash_algorithm/1} + , {salt_position, fun salt_position/1} + ]. + +collection(type) -> binary(); +collection(nullable) -> false; +collection(_) -> undefined. + +selector(type) -> map(); +selector(nullable) -> false; +selector(_) -> undefined. + +password_hash_field(type) -> binary(); +password_hash_field(nullable) -> false; +password_hash_field(_) -> undefined. + +salt_field(type) -> binary(); +salt_field(nullable) -> true; +salt_field(_) -> undefined. + +password_hash_algorithm(type) -> {enum, [plain, md5, sha, sha256, sha512, bcrypt]}; +password_hash_algorithm(default) -> sha256; +password_hash_algorithm(_) -> undefined. + +salt_position(type) -> {enum, [prefix, suffix]}; +salt_position(default) -> prefix; +salt_position(_) -> undefined. + +%%------------------------------------------------------------------------------ +%% APIs +%%------------------------------------------------------------------------------ + +create(#{ selector := Selector + , '_unique' := Unique + } = Config) -> + NSelector = parse_selector(Selector), + State = maps:with([ collection + , password_hash_field + , salt_field + , password_hash_algorithm + , salt_position + , '_unique'], Config), + NState = State#{selector => NSelector}, + case emqx_resource:create_local(Unique, emqx_connector_mongo, Config) of + {ok, _} -> + {ok, NState}; + {error, already_created} -> + {ok, NState}; + {error, Reason} -> + {error, Reason} + end. + +update(Config, State) -> + case create(Config) of + {ok, NewState} -> + ok = destroy(State), + {ok, NewState}; + {error, Reason} -> + {error, Reason} + end. + +authenticate(#{auth_method := _}, _) -> + ignore; +authenticate(#{password := Password} = Credential, + #{ collection := Collection + , selector := Selector0 + , '_unique' := Unique + } = State) -> + try + Selector1 = replace_placeholders(Selector0, Credential), + Selector2 = normalize_selector(Selector1), + case emqx_resource:query(Unique, {find_one, Collection, Selector2, #{}}) of + undefined -> ignore; + {error, Reason} -> + ?LOG(error, "['~s'] Query failed: ~p", [Unique, Reason]), + ignore; + Doc -> + case check_password(Password, Doc, State) of + ok -> ok; + {error, {cannot_find_password_hash_field, PasswordHashField}} -> + ?LOG(error, "['~s'] Can't find password hash field: ~s", [Unique, PasswordHashField]), + {error, bad_username_or_password}; + {error, Reason} -> + {error, Reason} + end + end + catch + error:Error -> + ?LOG(warning, "The following error occurred in '~s' during authentication: ~p", [Unique, Error]), + ignore + end. + +destroy(#{'_unique' := Unique}) -> + _ = emqx_resource:remove_local(Unique), + ok. + +%%------------------------------------------------------------------------------ +%% Internal functions +%%------------------------------------------------------------------------------ + +parse_selector(Selector) -> + NSelector = emqx_json:encode(Selector), + Tokens = re:split(NSelector, "(" ++ ?RE_PLACEHOLDER ++ ")", [{return, binary}, group, trim]), + parse_selector(Tokens, []). + +parse_selector([], Acc) -> + lists:reverse(Acc); +parse_selector([[Constant, Placeholder] | Tokens], Acc) -> + parse_selector(Tokens, [{placeholder, Placeholder}, {constant, Constant} | Acc]); +parse_selector([[Constant] | Tokens], Acc) -> + parse_selector(Tokens, [{constant, Constant} | Acc]). + +replace_placeholders(Selector, Credential) -> + lists:map(fun({constant, Constant}) -> + Constant; + ({placeholder, Placeholder}) -> + case emqx_authn_utils:replace_placeholder(Placeholder, Credential) of + undefined -> error({cannot_get_variable, Placeholder}); + Value -> Value + end + end, Selector). + +normalize_selector(Selector) -> + emqx_json:decode(iolist_to_binary(Selector), [return_maps]). + +check_password(undefined, _Selected, _State) -> + {error, bad_username_or_password}; +check_password(Password, + Doc, + #{password_hash_algorithm := bcrypt, + password_hash_field := PasswordHashField}) -> + case maps:get(PasswordHashField, Doc, undefined) of + undefined -> + {error, {cannot_find_password_hash_field, PasswordHashField}}; + Hash -> + case {ok, Hash} =:= bcrypt:hashpw(Password, Hash) of + true -> ok; + false -> {error, bad_username_or_password} + end + end; +check_password(Password, + Doc, + #{password_hash_algorithm := Algorithm, + password_hash_field := PasswordHashField, + salt_position := SaltPosition} = State) -> + case maps:get(PasswordHashField, Doc, undefined) of + undefined -> + {error, {cannot_find_password_hash_field, PasswordHashField}}; + Hash -> + Salt = case maps:get(salt_field, State, undefined) of + undefined -> <<>>; + SaltField -> maps:get(SaltField, Doc, <<>>) + end, + case Hash =:= hash(Algorithm, Password, Salt, SaltPosition) of + true -> ok; + false -> {error, bad_username_or_password} + end + end. + +hash(Algorithm, Password, Salt, prefix) -> + emqx_passwd:hash(Algorithm, <>); +hash(Algorithm, Password, Salt, suffix) -> + emqx_passwd:hash(Algorithm, <>). diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl index 200d7afc4..62c5c49e7 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl @@ -17,6 +17,7 @@ -module(emqx_authn_mysql). -include("emqx_authn.hrl"). +-include_lib("emqx/include/logger.hrl"). -include_lib("typerefl/include/types.hrl"). -behaviour(hocon_schema). @@ -46,25 +47,12 @@ fields(config) -> , {query, fun query/1} , {query_timeout, fun query_timeout/1} ] ++ emqx_connector_schema_lib:relational_db_fields() - ++ emqx_connector_schema_lib:ssl_fields(); + ++ emqx_connector_schema_lib:ssl_fields(). -fields(bcrypt) -> - [ {name, {enum, [bcrypt]}} - , {salt_rounds, fun salt_rounds/1} - ]; - -fields(other_algorithms) -> - [ {name, {enum, [plain, md5, sha, sha256, sha512]}} - ]. - -password_hash_algorithm(type) -> {union, [hoconsc:ref(bcrypt), hoconsc:ref(other_algorithms)]}; -password_hash_algorithm(default) -> #{<<"name">> => sha256}; +password_hash_algorithm(type) -> {enum, [plain, md5, sha, sha256, sha512, bcrypt]}; +password_hash_algorithm(default) -> sha256; password_hash_algorithm(_) -> undefined. -salt_rounds(type) -> integer(); -salt_rounds(default) -> 10; -salt_rounds(_) -> undefined. - salt_position(type) -> {enum, [prefix, suffix]}; salt_position(default) -> prefix; salt_position(_) -> undefined. @@ -92,12 +80,13 @@ create(#{ password_hash_algorithm := Algorithm salt_position => SaltPosition, query => Query, placeholders => PlaceHolders, - query_timeout => QueryTimeout}, + query_timeout => QueryTimeout, + '_unique' => Unique}, case emqx_resource:create_local(Unique, emqx_connector_mysql, Config) of {ok, _} -> - {ok, State#{resource_id => Unique}}; + {ok, State}; {error, already_created} -> - {ok, State#{resource_id => Unique}}; + {ok, State}; {error, Reason} -> {error, Reason} end. @@ -114,36 +103,41 @@ update(Config, State) -> authenticate(#{auth_method := _}, _) -> ignore; authenticate(#{password := Password} = Credential, - #{resource_id := ResourceID, - placeholders := PlaceHolders, + #{placeholders := PlaceHolders, query := Query, - query_timeout := Timeout} = State) -> - Params = emqx_authn_utils:replace_placeholder(PlaceHolders, Credential), - case emqx_resource:query(ResourceID, {sql, Query, Params, Timeout}) of - {ok, _Columns, []} -> ignore; - {ok, Columns, Rows} -> - %% TODO: Support superuser - Selected = maps:from_list(lists:zip(Columns, Rows)), - check_password(Password, Selected, State); - {error, _Reason} -> + query_timeout := Timeout, + '_unique' := Unique} = State) -> + try + Params = emqx_authn_utils:replace_placeholders(PlaceHolders, Credential), + case emqx_resource:query(Unique, {sql, Query, Params, Timeout}) of + {ok, _Columns, []} -> ignore; + {ok, Columns, Rows} -> + %% TODO: Support superuser + Selected = maps:from_list(lists:zip(Columns, Rows)), + check_password(Password, Selected, State); + {error, _Reason} -> + ignore + end + catch + error:Reason -> + ?LOG(warning, "The following error occurred in '~s' during authentication: ~p", [Unique, Reason]), ignore end. -destroy(#{resource_id := ResourceID}) -> - _ = emqx_resource:remove_local(ResourceID), +destroy(#{'_unique' := Unique}) -> + _ = emqx_resource:remove_local(Unique), ok. %%------------------------------------------------------------------------------ %% Internal functions %%------------------------------------------------------------------------------ -check_password(undefined, _Algorithm, _Selected) -> +check_password(undefined, _Selected, _State) -> {error, bad_username_or_password}; check_password(Password, #{password_hash := Hash}, #{password_hash_algorithm := bcrypt}) -> - {ok, Hash0} = bcrypt:hashpw(Password, Hash), - case list_to_binary(Hash0) =:= Hash of + case {ok, Hash} =:= bcrypt:hashpw(Password, Hash) of true -> ok; false -> {error, bad_username_or_password} end; @@ -163,7 +157,7 @@ check_password(Password, %% TODO: Support prepare parse_query(Query) -> - case re:run(Query, "\\$\\{[a-z0-9\\_]+\\}", [global, {capture, all, binary}]) of + case re:run(Query, ?RE_PLACEHOLDER, [global, {capture, all, binary}]) of {match, Captured} -> PlaceHolders = [PlaceHolder || PlaceHolder <- Captured], NQuery = re:replace(Query, "'\\$\\{[a-z0-9\\_]+\\}'", "?", [global, {return, binary}]), diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl index 100e5cce7..a4d00be29 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl @@ -17,6 +17,7 @@ -module(emqx_authn_pgsql). -include("emqx_authn.hrl"). +-include_lib("emqx/include/logger.hrl"). -include_lib("typerefl/include/types.hrl"). -behaviour(hocon_schema). @@ -45,7 +46,8 @@ fields(config) -> ] ++ emqx_connector_schema_lib:relational_db_fields() ++ emqx_connector_schema_lib:ssl_fields(). -password_hash_algorithm(type) -> string(); +password_hash_algorithm(type) -> {enum, [plain, md5, sha, sha256, sha512, bcrypt]}; +password_hash_algorithm(default) -> sha256; password_hash_algorithm(_) -> undefined. query(type) -> string(); @@ -65,12 +67,13 @@ create(#{ query := Query0 State = #{query => Query, placeholders => PlaceHolders, password_hash_algorithm => Algorithm, - salt_position => SaltPosition}, + salt_position => SaltPosition, + '_unique' => Unique}, case emqx_resource:create_local(Unique, emqx_connector_pgsql, Config) of {ok, _} -> - {ok, State#{resource_id => Unique}}; + {ok, State}; {error, already_created} -> - {ok, State#{resource_id => Unique}}; + {ok, State}; {error, Reason} -> {error, Reason} end. @@ -87,35 +90,40 @@ update(Config, State) -> authenticate(#{auth_method := _}, _) -> ignore; authenticate(#{password := Password} = Credential, - #{resource_id := ResourceID, - query := Query, - placeholders := PlaceHolders} = State) -> - Params = emqx_authn_utils:replace_placeholder(PlaceHolders, Credential), - case emqx_resource:query(ResourceID, {sql, Query, Params}) of - {ok, _Columns, []} -> ignore; - {ok, Columns, Rows} -> - %% TODO: Support superuser - Selected = maps:from_list(lists:zip(Columns, Rows)), - check_password(Password, Selected, State); - {error, _Reason} -> + #{query := Query, + placeholders := PlaceHolders, + '_unique' := Unique} = State) -> + try + Params = emqx_authn_utils:replace_placeholders(PlaceHolders, Credential), + case emqx_resource:query(Unique, {sql, Query, Params}) of + {ok, _Columns, []} -> ignore; + {ok, Columns, Rows} -> + %% TODO: Support superuser + Selected = maps:from_list(lists:zip(Columns, Rows)), + check_password(Password, Selected, State); + {error, _Reason} -> + ignore + end + catch + error:Reason -> + ?LOG(warning, "The following error occurred in '~s' during authentication: ~p", [Unique, Reason]), ignore end. -destroy(#{resource_id := ResourceID}) -> - _ = emqx_resource:remove_local(ResourceID), +destroy(#{'_unique' := Unique}) -> + _ = emqx_resource:remove_local(Unique), ok. %%------------------------------------------------------------------------------ %% Internal functions %%------------------------------------------------------------------------------ -check_password(undefined, _Algorithm, _Selected) -> +check_password(undefined, _Selected, _State) -> {error, bad_username_or_password}; check_password(Password, #{password_hash := Hash}, #{password_hash_algorithm := bcrypt}) -> - {ok, Hash0} = bcrypt:hashpw(Password, Hash), - case list_to_binary(Hash0) =:= Hash of + case {ok, Hash} =:= bcrypt:hashpw(Password, Hash) of true -> ok; false -> {error, bad_username_or_password} end; @@ -135,7 +143,7 @@ check_password(Password, %% TODO: Support prepare parse_query(Query) -> - case re:run(Query, "\\$\\{[a-z0-9\\_]+\\}", [global, {capture, all, binary}]) of + case re:run(Query, ?RE_PLACEHOLDER, [global, {capture, all, binary}]) of {match, Captured} -> PlaceHolders = [PlaceHolder || PlaceHolder <- Captured], Replacements = ["$" ++ integer_to_list(I) || I <- lists:seq(1, length(Captured))], diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl index daddb7e13..f21fb4bf4 100644 --- a/apps/emqx_connector/src/emqx_connector_mongo.erl +++ b/apps/emqx_connector/src/emqx_connector_mongo.erl @@ -149,11 +149,14 @@ connect(Opts) -> WorkerOptions = proplists:get_value(worker_options, Opts, []), mongo_api:connect(Type, Hosts, Options, WorkerOptions). -mongo_query(Conn, find, Collection, Selector, Docs) -> - mongo_api:find(Conn, Collection, Selector, Docs); +mongo_query(Conn, find, Collection, Selector, Projector) -> + mongo_api:find(Conn, Collection, Selector, Projector); + +mongo_query(Conn, find_one, Collection, Selector, Projector) -> + mongo_api:find_one(Conn, Collection, Selector, Projector); %% Todo xxx -mongo_query(_Conn, _Action, _Collection, _Selector, _Docs) -> +mongo_query(_Conn, _Action, _Collection, _Selector, _Projector) -> ok. do_start(InstId, Opts0, Config = #{mongo_type := Type, diff --git a/apps/emqx_connector/src/emqx_connector_schema_lib.erl b/apps/emqx_connector/src/emqx_connector_schema_lib.erl index 7dcf24be5..d0b314077 100644 --- a/apps/emqx_connector/src/emqx_connector_schema_lib.erl +++ b/apps/emqx_connector/src/emqx_connector_schema_lib.erl @@ -71,7 +71,7 @@ ssl_fields() -> [ hoconsc:ref(?MODULE, ssl_on) , hoconsc:ref(?MODULE, ssl_off) ]), - default => hoconsc:ref(?MODULE, ssl_off) + default => #{<<"enable">> => false} } } ].