diff --git a/.ci/docker-compose-file/docker-compose-mongo-tcp.yaml b/.ci/docker-compose-file/docker-compose-mongo-tcp.yaml index dee2daff6..494b42ce4 100644 --- a/.ci/docker-compose-file/docker-compose-mongo-tcp.yaml +++ b/.ci/docker-compose-file/docker-compose-mongo-tcp.yaml @@ -9,6 +9,8 @@ services: MONGO_INITDB_DATABASE: mqtt networks: - emqx_bridge + ports: + - "27017:27017" command: --ipv6 --bind_ip_all diff --git a/apps/emqx_authz/README.md b/apps/emqx_authz/README.md index 699781b71..de0d695ee 100644 --- a/apps/emqx_authz/README.md +++ b/apps/emqx_authz/README.md @@ -133,3 +133,16 @@ HSET mqtt_acl:emqx '$SYS/#' subscribe A rule of Redis ACL defines `publish`, `subscribe`, or `all `information. All lists in the rule are **allow** lists. +#### Mongo + +Create Example BSON documents +```sql +db.inventory.insertOne( + {username: "emqx", + clientid: "emqx", + ipaddress: "127.0.0.1", + permission: "allow", + action: "all", + topics: ["#"] + }) +``` diff --git a/apps/emqx_authz/etc/emqx_authz.conf b/apps/emqx_authz/etc/emqx_authz.conf index 89515592f..e91a68a63 100644 --- a/apps/emqx_authz/etc/emqx_authz.conf +++ b/apps/emqx_authz/etc/emqx_authz.conf @@ -43,6 +43,18 @@ emqx_authz:{ # } # cmd: "HGETALL mqtt_acl:%u" # }, + # { + # type: mongo + # config: { + # mongo_type: single + # servers: "127.0.0.1:27017" + # pool_size: 1 + # database: mqtt + # ssl: {enable: false} + # } + # collection: mqtt_acl + # find: { "$or": [ { "username": "%u" }, { "clientid": "%c" } ] } + # }, { permission: allow action: all diff --git a/apps/emqx_authz/src/emqx_authz.erl b/apps/emqx_authz/src/emqx_authz.erl index 6710e4f69..f673b3979 100644 --- a/apps/emqx_authz/src/emqx_authz.erl +++ b/apps/emqx_authz/src/emqx_authz.erl @@ -93,8 +93,9 @@ compile(#{topics := Topics, }; compile(#{principal := Principal, - type := redis - } = Rule) -> + type := DB + } = Rule) when DB =:= redis; + DB =:= mongo -> NRule = create_resource(Rule), NRule#{principal => compile_principal(Principal)}; diff --git a/apps/emqx_authz/src/emqx_authz_mongo.erl b/apps/emqx_authz/src/emqx_authz_mongo.erl new file mode 100644 index 000000000..a106dd0f5 --- /dev/null +++ b/apps/emqx_authz/src/emqx_authz_mongo.erl @@ -0,0 +1,106 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_authz_mongo). + +-include("emqx_authz.hrl"). +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/logger.hrl"). + +%% ACL Callbacks +-export([ authorize/4 + , description/0 + ]). + +-ifdef(TEST). +-compile(export_all). +-compile(nowarn_export_all). +-endif. + +description() -> + "AuthZ with Mongo". + +authorize(Client, PubSub, Topic, + #{resource_id := ResourceID, + collection := Collection, + find := Find + }) -> + case emqx_resource:query(ResourceID, {find, Collection, replvar(Find, Client), #{}}) of + {error, Reason} -> + ?LOG(error, "[AuthZ] Query mongo error: ~p", [Reason]), + nomatch; + [] -> nomatch; + Rows -> + do_authorize(Client, PubSub, Topic, Rows) + end. + +do_authorize(_Client, _PubSub, _Topic, []) -> + nomatch; +do_authorize(Client, PubSub, Topic, [Rule | Tail]) -> + case match(Client, PubSub, Topic, Rule) + of + {matched, Permission} -> {matched, Permission}; + nomatch -> do_authorize(Client, PubSub, Topic, Tail) + end. + +match(Client, PubSub, Topic, + #{<<"topics">> := Topics, + <<"permission">> := Permission, + <<"action">> := Action + }) -> + Rule = #{<<"principal">> => all, + <<"permission">> => Permission, + <<"topics">> => Topics, + <<"action">> => Action + }, + #{simple_rule := + #{permission := NPermission} = NRule + } = hocon_schema:check_plain( + emqx_authz_schema, + #{<<"simple_rule">> => Rule}, + #{atom_key => true}, + [simple_rule]), + case emqx_authz:match(Client, PubSub, Topic, emqx_authz:compile(NRule)) of + true -> {matched, NPermission}; + false -> nomatch + end. + +replvar(Find, #{clientid := Clientid, + username := Username, + peerhost := IpAddress + }) -> + Fun = fun + _Fun(K, V, AccIn) when is_map(V) -> maps:put(K, maps:fold(_Fun, AccIn, V), AccIn); + _Fun(K, V, AccIn) when is_list(V) -> + maps:put(K, [ begin + [{K1, V1}] = maps:to_list(M), + _Fun(K1, V1, AccIn) + end || M <- V], + AccIn); + _Fun(K, V, AccIn) when is_binary(V) -> + V1 = re:replace(V, "%c", bin(Clientid), [global, {return, binary}]), + V2 = re:replace(V1, "%u", bin(Username), [global, {return, binary}]), + V3 = re:replace(V2, "%a", inet_parse:ntoa(IpAddress), [global, {return, binary}]), + maps:put(K, V3, AccIn); + _Fun(K, V, AccIn) -> maps:put(K, V, AccIn) + end, + maps:fold(Fun, #{}, Find). + +bin(A) when is_atom(A) -> atom_to_binary(A, utf8); +bin(B) when is_binary(B) -> B; +bin(L) when is_list(L) -> list_to_binary(L); +bin(X) -> X. + diff --git a/apps/emqx_authz/src/emqx_authz_schema.erl b/apps/emqx_authz/src/emqx_authz_schema.erl index 5836eb12b..5c82d460e 100644 --- a/apps/emqx_authz/src/emqx_authz_schema.erl +++ b/apps/emqx_authz/src/emqx_authz_schema.erl @@ -16,6 +16,13 @@ structs() -> ["emqx_authz"]. fields("emqx_authz") -> [ {rules, rules()} ]; +fields(mongo_connector) -> + [ {principal, principal()} + , {type, #{type => hoconsc:enum([mongo])}} + , {config, #{type => map()}} + , {collection, #{type => atom()}} + , {find, #{type => map()}} + ]; fields(redis_connector) -> [ {principal, principal()} , {type, #{type => hoconsc:enum([redis])}} @@ -27,7 +34,6 @@ fields(redis_connector) -> } , {cmd, query()} ]; - fields(sql_connector) -> [ {principal, principal() } , {type, #{type => hoconsc:enum([mysql, pgsql])}} diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl index 9b5609c2f..dda192252 100644 --- a/apps/emqx_connector/src/emqx_connector_mongo.erl +++ b/apps/emqx_connector/src/emqx_connector_mongo.erl @@ -38,7 +38,7 @@ structs() -> [""]. fields("") -> mongodb_fields() ++ mongodb_topology_fields() ++ - mongodb_rs_set_name_fields() ++ + % mongodb_rs_set_name_fields() ++ emqx_connector_schema_lib:ssl_fields(). on_jsonify(Config) -> @@ -71,7 +71,7 @@ on_start(InstId, #{servers := Servers, PoolName = emqx_plugin_libs_pool:pool_name(InstId), _ = emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ SslOpts), - {ok, #{pool => PoolName, + {ok, #{poolname => PoolName, type => Type, test_conn => TestConn, test_opts => TestOpts}}. @@ -82,23 +82,27 @@ on_stop(InstId, #{poolname := PoolName}) -> on_query(InstId, {Action, Collection, Selector, Docs}, AfterQuery, #{poolname := PoolName} = State) -> logger:debug("mongodb connector ~p received request: ~p, at state: ~p", [InstId, {Action, Collection, Selector, Docs}, State]), - case Result = ecpool:pick_and_do(PoolName, {?MODULE, mongo_query, [Action, Collection, Selector, Docs]}, no_handover) of + case ecpool:pick_and_do(PoolName, {?MODULE, mongo_query, [Action, Collection, Selector, Docs]}, no_handover) of {error, Reason} -> logger:debug("mongodb connector ~p do sql query failed, request: ~p, reason: ~p", [InstId, {Action, Collection, Selector, Docs}, Reason]), - emqx_resource:query_failed(AfterQuery); - _ -> - emqx_resource:query_success(AfterQuery) - end, - Result. + emqx_resource:query_failed(AfterQuery), + {error, Reason}; + {ok, Cursor} when is_pid(Cursor) -> + emqx_resource:query_success(AfterQuery), + mc_cursor:foldl(fun(O, Acc2) -> [O|Acc2] end, [], Cursor, 1000); + Result -> + emqx_resource:query_success(AfterQuery), + Result + end. -dialyzer({nowarn_function, [on_health_check/2]}). -on_health_check(_InstId, #{test_opts := TestOpts}) -> +on_health_check(_InstId, #{test_opts := TestOpts} = State) -> case mc_worker_api:connect(TestOpts) of {ok, TestConn} -> mc_worker_api:disconnect(TestConn), - {ok, true}; + {ok, State}; {error, _} -> - {ok, false} + {error, health_check_failed, State} end. %% =================================================================== @@ -197,11 +201,12 @@ mongodb_topology_fields() -> , {min_heartbeat_frequency_ms, fun duration/1} ]. -mongodb_rs_set_name_fields() -> - [ {rs_set_name, fun emqx_connector_schema_lib:database/1} - ]. +% mongodb_rs_set_name_fields() -> +% [ {rs_set_name, fun emqx_connector_schema_lib:database/1} +% ]. auth_source(type) -> binary(); +auth_source(nullable) -> true; auth_source(_) -> undefined. servers(type) -> binary(); @@ -213,4 +218,5 @@ mongo_type(default) -> single; mongo_type(_) -> undefined. duration(type) -> emqx_schema:duration_ms(); +duration(nullable) -> true; duration(_) -> undefined. diff --git a/apps/emqx_connector/src/emqx_connector_redis.erl b/apps/emqx_connector/src/emqx_connector_redis.erl index 4e1dc1773..6333cdb1a 100644 --- a/apps/emqx_connector/src/emqx_connector_redis.erl +++ b/apps/emqx_connector/src/emqx_connector_redis.erl @@ -140,7 +140,7 @@ on_health_check(_InstId, #{type := cluster, poolname := PoolName} = State) -> eredis_cluster_pool_worker:is_connected(Pid) =:= true end, Workers) of true -> {ok, State}; - false -> {error, test_query_failed, State} + false -> {error, health_check_failed, State} end; on_health_check(_InstId, #{poolname := PoolName} = State) -> emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State). diff --git a/apps/emqx_connector/src/emqx_connector_schema_lib.erl b/apps/emqx_connector/src/emqx_connector_schema_lib.erl index 17069c7f0..743d37ae3 100644 --- a/apps/emqx_connector/src/emqx_connector_schema_lib.erl +++ b/apps/emqx_connector/src/emqx_connector_schema_lib.erl @@ -99,11 +99,11 @@ pool_size(validator) -> [?MIN(1), ?MAX(64)]; pool_size(_) -> undefined. username(type) -> binary(); -username(default) -> "root"; +username(nullable) -> true; username(_) -> undefined. password(type) -> binary(); -password(default) -> ""; +password(nullable) -> true; password(_) -> undefined. auto_reconnect(type) -> boolean(); diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl index 71119264d..03c5bdc8f 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl @@ -54,5 +54,5 @@ health_check(PoolName, CheckFunc, State) when is_function(CheckFunc) -> end || {_WorkerName, Worker} <- ecpool:workers(PoolName)], case length(Status) > 0 andalso lists:all(fun(St) -> St =:= true end, Status) of true -> {ok, State}; - false -> {error, test_query_failed, State} + false -> {error, health_check_failed, State} end.