refactor(authn resources): add `emqx_resource` and `emqx_authn` tests

This commit is contained in:
Ilya Averyanov 2021-11-16 10:10:04 +03:00
parent b59ad6b186
commit 071c2c99e8
57 changed files with 1714 additions and 976 deletions

View File

@ -0,0 +1,49 @@
.PHONY: help up down ct ct-all bash run
define usage
make -f .ci/docker-compose-file/Makefile.local up
make -f .ci/docker-compose-file/Makefile.local ct CONTAINER=erlang24 SUITE=apps/emqx_authn/test/emqx_authn_mnesia_SUITE.erl
make -f .ci/docker-compose-file/Makefile.local down
endef
export usage
help:
@echo "$$usage"
up:
env \
MYSQL_TAG=8 \
REDIS_TAG=6 \
MONGO_TAG=4 \
PGSQL_TAG=13 \
LDAP_TAG=2.4.50 \
docker-compose \
-f .ci/docker-compose-file/docker-compose.yaml \
-f .ci/docker-compose-file/docker-compose-ldap-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-mongo-single-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-mysql-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-pgsql-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-redis-single-tcp.yaml \
up -d --build
down:
docker-compose \
-f .ci/docker-compose-file/docker-compose.yaml \
-f .ci/docker-compose-file/docker-compose-ldap-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-mongo-single-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-mysql-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-pgsql-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-redis-single-tcp.yaml \
down
ct:
docker exec -i "$(CONTAINER)" bash -c "rebar3 ct --name 'test@127.0.0.1' -v --suite $(SUITE)"
ct-all:
docker exec -i "$(CONTAINER)" bash -c "make ct"
bash:
docker exec -it "$(CONTAINER)" bash
run:
docker exec -it "$(CONTAINER)" bash -c "make run";

View File

@ -10,9 +10,9 @@ RUN wget ftp://ftp.openldap.org/pub/OpenLDAP/openldap-release/openldap-${LDAP_TA
&& cd .. && rm -rf openldap-${LDAP_TAG}
COPY .ci/docker-compose-file/openldap/slapd.conf /usr/local/etc/openldap/slapd.conf
COPY apps/emqx_auth_ldap/emqx.io.ldif /usr/local/etc/openldap/schema/emqx.io.ldif
COPY apps/emqx_auth_ldap/emqx.schema /usr/local/etc/openldap/schema/emqx.schema
COPY apps/emqx_auth_ldap/test/certs/*.pem /usr/local/etc/openldap/
COPY apps/emqx_authn/test/data/emqx.io.ldif /usr/local/etc/openldap/schema/emqx.io.ldif
COPY apps/emqx_authn/test/data/emqx.schema /usr/local/etc/openldap/schema/emqx.schema
COPY apps/emqx_authn/test/data/certs/*.pem /usr/local/etc/openldap/
RUN mkdir -p /usr/local/etc/openldap/data \
&& slapadd -l /usr/local/etc/openldap/schema/emqx.io.ldif -f /usr/local/etc/openldap/slapd.conf

View File

@ -67,9 +67,11 @@ jobs:
- uses: actions/checkout@v2
- name: docker compose up
env:
REDIS_TAG: 6
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
docker-compose \
-f .ci/docker-compose-file/docker-compose-redis-single-tcp.yaml \
-f .ci/docker-compose-file/docker-compose.yaml \
up -d --build
- name: run eunit

View File

@ -95,9 +95,6 @@
-define(CHAINS_TAB, emqx_authn_chains).
-define(VER_1, <<"1">>).
-define(VER_2, <<"2">>).
-type chain_name() :: atom().
-type authenticator_id() :: binary().
-type position() :: top | bottom | {before, authenticator_id()}.
@ -123,10 +120,10 @@
%% parse and validate it, and reutrn parsed result.
-callback check_config(config()) -> config().
-callback create(Config)
-callback create(AuthenticatorID, Config)
-> {ok, State}
| {error, term()}
when Config::config(), State::state().
when AuthenticatorID::authenticator_id(), Config::config(), State::state().
-callback update(Config, State)
-> {ok, NewState}
@ -195,29 +192,6 @@ authenticate(#{listener := Listener, protocol := Protocol} = Credential, _AuthRe
NAuthenticators -> do_authenticate(NAuthenticators, Credential)
end.
do_authenticate([], _) ->
{stop, {error, not_authorized}};
do_authenticate([#authenticator{id = ID, provider = Provider, state = State} | More], Credential) ->
try Provider:authenticate(Credential, State) of
ignore ->
do_authenticate(More, Credential);
Result ->
%% {ok, Extra}
%% {ok, Extra, AuthData}
%% {continue, AuthCache}
%% {continue, AuthData, AuthCache}
%% {error, Reason}
{stop, Result}
catch
Class:Reason:Stacktrace ->
?SLOG(warning, #{msg => "unexpected_error_in_authentication",
exception => Class,
reason => Reason,
stacktrace => Stacktrace,
authenticator => ID}),
do_authenticate(More, Credential)
end.
get_authenticators(Listener, Global) ->
case ets:lookup(?CHAINS_TAB, Listener) of
[#chain{authenticators = Authenticators}] ->
@ -344,11 +318,13 @@ create_authenticator(ChainName, Config) ->
delete_authenticator(ChainName, AuthenticatorID) ->
call({delete_authenticator, ChainName, AuthenticatorID}).
-spec update_authenticator(chain_name(), authenticator_id(), config()) -> {ok, authenticator()} | {error, term()}.
-spec update_authenticator(chain_name(), authenticator_id(), config()) ->
{ok, authenticator()} | {error, term()}.
update_authenticator(ChainName, AuthenticatorID, Config) ->
call({update_authenticator, ChainName, AuthenticatorID, Config}).
-spec lookup_authenticator(chain_name(), authenticator_id()) -> {ok, authenticator()} | {error, term()}.
-spec lookup_authenticator(chain_name(), authenticator_id()) ->
{ok, authenticator()} | {error, term()}.
lookup_authenticator(ChainName, AuthenticatorID) ->
case ets:lookup(?CHAINS_TAB, ChainName) of
[] ->
@ -379,7 +355,8 @@ move_authenticator(ChainName, AuthenticatorID, Position) ->
import_users(ChainName, AuthenticatorID, Filename) ->
call({import_users, ChainName, AuthenticatorID, Filename}).
-spec add_user(chain_name(), authenticator_id(), user_info()) -> {ok, user_info()} | {error, term()}.
-spec add_user(chain_name(), authenticator_id(), user_info()) ->
{ok, user_info()} | {error, term()}.
add_user(ChainName, AuthenticatorID, UserInfo) ->
call({add_user, ChainName, AuthenticatorID, UserInfo}).
@ -387,11 +364,13 @@ add_user(ChainName, AuthenticatorID, UserInfo) ->
delete_user(ChainName, AuthenticatorID, UserID) ->
call({delete_user, ChainName, AuthenticatorID, UserID}).
-spec update_user(chain_name(), authenticator_id(), binary(), map()) -> {ok, user_info()} | {error, term()}.
-spec update_user(chain_name(), authenticator_id(), binary(), map()) ->
{ok, user_info()} | {error, term()}.
update_user(ChainName, AuthenticatorID, UserID, NewUserInfo) ->
call({update_user, ChainName, AuthenticatorID, UserID, NewUserInfo}).
-spec lookup_user(chain_name(), authenticator_id(), binary()) -> {ok, user_info()} | {error, term()}.
-spec lookup_user(chain_name(), authenticator_id(), binary()) ->
{ok, user_info()} | {error, term()}.
lookup_user(ChainName, AuthenticatorID, UserID) ->
call({lookup_user, ChainName, AuthenticatorID, UserID}).
@ -441,87 +420,36 @@ handle_call({delete_chain, Name}, _From, State) ->
[] ->
reply({error, {not_found, {chain, Name}}}, State);
[#chain{authenticators = Authenticators}] ->
_ = [do_delete_authenticator(Authenticator) || Authenticator <- Authenticators],
_ = [do_destroy_authenticator(Authenticator) || Authenticator <- Authenticators],
true = ets:delete(?CHAINS_TAB, Name),
reply(ok, maybe_unhook(State))
end;
handle_call({create_authenticator, ChainName, Config}, _From, #{providers := Providers} = State) ->
UpdateFun =
fun(#chain{authenticators = Authenticators} = Chain) ->
AuthenticatorID = authenticator_id(Config),
case lists:keymember(AuthenticatorID, #authenticator.id, Authenticators) of
true ->
{error, {already_exists, {authenticator, AuthenticatorID}}};
false ->
case do_create_authenticator(ChainName, AuthenticatorID, Config, Providers) of
{ok, Authenticator} ->
NAuthenticators = Authenticators ++ [Authenticator#authenticator{enable = maps:get(enable, Config)}],
true = ets:insert(?CHAINS_TAB, Chain#chain{authenticators = NAuthenticators}),
{ok, serialize_authenticator(Authenticator)};
{error, Reason} ->
{error, Reason}
end
end
end,
UpdateFun = fun(Chain) ->
handle_create_authenticator(Chain, Config, Providers)
end,
Reply = update_chain(ChainName, UpdateFun),
reply(Reply, maybe_hook(State));
handle_call({delete_authenticator, ChainName, AuthenticatorID}, _From, State) ->
UpdateFun =
fun(#chain{authenticators = Authenticators} = Chain) ->
case lists:keytake(AuthenticatorID, #authenticator.id, Authenticators) of
false ->
{error, {not_found, {authenticator, AuthenticatorID}}};
{value, Authenticator, NAuthenticators} ->
_ = do_delete_authenticator(Authenticator),
true = ets:insert(?CHAINS_TAB, Chain#chain{authenticators = NAuthenticators}),
ok
end
end,
UpdateFun = fun(Chain) ->
handle_delete_authenticator(Chain, AuthenticatorID)
end,
Reply = update_chain(ChainName, UpdateFun),
reply(Reply, maybe_unhook(State));
handle_call({update_authenticator, ChainName, AuthenticatorID, Config}, _From, State) ->
UpdateFun =
fun(#chain{authenticators = Authenticators} = Chain) ->
case lists:keyfind(AuthenticatorID, #authenticator.id, Authenticators) of
false ->
{error, {not_found, {authenticator, AuthenticatorID}}};
#authenticator{provider = Provider,
state = #{version := Version} = ST} = Authenticator ->
case AuthenticatorID =:= authenticator_id(Config) of
true ->
Unique = unique(ChainName, AuthenticatorID, Version),
case Provider:update(Config#{'_unique' => Unique}, ST) of
{ok, NewST} ->
NewAuthenticator = Authenticator#authenticator{state = switch_version(NewST#{version => Version}),
enable = maps:get(enable, Config)},
NewAuthenticators = replace_authenticator(AuthenticatorID, NewAuthenticator, Authenticators),
true = ets:insert(?CHAINS_TAB, Chain#chain{authenticators = NewAuthenticators}),
{ok, serialize_authenticator(NewAuthenticator)};
{error, Reason} ->
{error, Reason}
end;
false ->
{error, change_of_authentication_type_is_not_allowed}
end
end
end,
UpdateFun = fun(Chain) ->
handle_update_authenticator(Chain, AuthenticatorID, Config)
end,
Reply = update_chain(ChainName, UpdateFun),
reply(Reply, State);
handle_call({move_authenticator, ChainName, AuthenticatorID, Position}, _From, State) ->
UpdateFun =
fun(#chain{authenticators = Authenticators} = Chain) ->
case do_move_authenticator(AuthenticatorID, Authenticators, Position) of
{ok, NAuthenticators} ->
true = ets:insert(?CHAINS_TAB, Chain#chain{authenticators = NAuthenticators}),
ok;
{error, Reason} ->
{error, Reason}
end
end,
UpdateFun = fun(Chain) ->
handle_move_authenticator(Chain, AuthenticatorID, Position)
end,
Reply = update_chain(ChainName, UpdateFun),
reply(Reply, State);
@ -575,13 +503,105 @@ terminate(Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%------------------------------------------------------------------------------
%% Private functions
%%------------------------------------------------------------------------------
handle_update_authenticator(Chain, AuthenticatorID, Config) ->
#chain{authenticators = Authenticators} = Chain,
case lists:keyfind(AuthenticatorID, #authenticator.id, Authenticators) of
false ->
{error, {not_found, {authenticator, AuthenticatorID}}};
#authenticator{provider = Provider, state = ST} = Authenticator ->
case AuthenticatorID =:= authenticator_id(Config) of
true ->
case Provider:update(Config, ST) of
{ok, NewST} ->
NewAuthenticator = Authenticator#authenticator{
state = NewST,
enable = maps:get(enable, Config)},
NewAuthenticators = replace_authenticator(
AuthenticatorID,
NewAuthenticator,
Authenticators),
true = ets:insert(
?CHAINS_TAB,
Chain#chain{authenticators = NewAuthenticators}),
{ok, serialize_authenticator(NewAuthenticator)};
{error, Reason} ->
{error, Reason}
end;
false ->
{error, change_of_authentication_type_is_not_allowed}
end
end.
handle_delete_authenticator(Chain, AuthenticatorID) ->
MatchFun = fun(#authenticator{id = ID}) ->
ID =:= AuthenticatorID
end,
case do_delete_authenticators(MatchFun, Chain) of
[] -> {error, {not_found, {authenticator, AuthenticatorID}}};
[AuthenticatorID] -> ok
end.
handle_move_authenticator(Chain, AuthenticatorID, Position) ->
#chain{authenticators = Authenticators} = Chain,
case do_move_authenticator(AuthenticatorID, Authenticators, Position) of
{ok, NAuthenticators} ->
true = ets:insert(?CHAINS_TAB, Chain#chain{authenticators = NAuthenticators}),
ok;
{error, Reason} ->
{error, Reason}
end.
handle_create_authenticator(Chain, Config, Providers) ->
#chain{authenticators = Authenticators} = Chain,
AuthenticatorID = authenticator_id(Config),
case lists:keymember(AuthenticatorID, #authenticator.id, Authenticators) of
true ->
{error, {already_exists, {authenticator, AuthenticatorID}}};
false ->
case do_create_authenticator(AuthenticatorID, Config, Providers) of
{ok, Authenticator} ->
NAuthenticators =
Authenticators ++
[Authenticator#authenticator{enable = maps:get(enable, Config)}],
true = ets:insert(?CHAINS_TAB,
Chain#chain{authenticators = NAuthenticators}),
{ok, serialize_authenticator(Authenticator)};
{error, Reason} ->
{error, Reason}
end
end.
do_authenticate([], _) ->
{stop, {error, not_authorized}};
do_authenticate([#authenticator{id = ID, provider = Provider, state = State} | More], Credential) ->
try Provider:authenticate(Credential, State) of
ignore ->
do_authenticate(More, Credential);
Result ->
%% {ok, Extra}
%% {ok, Extra, AuthData}
%% {continue, AuthCache}
%% {continue, AuthData, AuthCache}
%% {error, Reason}
{stop, Result}
catch
Class:Reason:Stacktrace ->
?SLOG(warning, #{msg => "unexpected_error_in_authentication",
exception => Class,
reason => Reason,
stacktrace => Stacktrace,
authenticator => ID}),
do_authenticate(More, Credential)
end.
reply(Reply, State) ->
{reply, Reply, State}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
create_chain_table() ->
try
_ = ets:new(?CHAINS_TAB, [named_table, set, public,
@ -631,25 +651,35 @@ maybe_unhook(#{hooked := true} = State) ->
maybe_unhook(State) ->
State.
do_create_authenticator(ChainName, AuthenticatorID, #{enable := Enable} = Config, Providers) ->
do_create_authenticator(AuthenticatorID, #{enable := Enable} = Config, Providers) ->
case maps:get(authn_type(Config), Providers, undefined) of
undefined ->
{error, no_available_provider};
Provider ->
Unique = unique(ChainName, AuthenticatorID, ?VER_1),
case Provider:create(Config#{'_unique' => Unique}) of
case Provider:create(AuthenticatorID, Config) of
{ok, State} ->
Authenticator = #authenticator{id = AuthenticatorID,
provider = Provider,
enable = Enable,
state = switch_version(State)},
state = State},
{ok, Authenticator};
{error, Reason} ->
{error, Reason}
end
end.
do_delete_authenticator(#authenticator{provider = Provider, state = State}) ->
do_delete_authenticators(MatchFun, #chain{authenticators = Authenticators} = Chain) ->
{Matching, Others} = lists:partition(MatchFun, Authenticators),
MatchingIDs = lists:map(
fun(#authenticator{id = ID}) -> ID end,
Matching),
ok = lists:foreach(fun do_destroy_authenticator/1, Matching),
true = ets:insert(?CHAINS_TAB, Chain#chain{authenticators = Others}),
MatchingIDs.
do_destroy_authenticator(#authenticator{provider = Provider, state = State}) ->
_ = Provider:destroy(State),
ok.
@ -722,17 +752,6 @@ serialize_authenticator(#authenticator{id = ID,
, state => State
}.
unique(ChainName, AuthenticatorID, Version) ->
NChainName = atom_to_binary(ChainName),
<<NChainName/binary, "/", AuthenticatorID/binary, ":", Version/binary>>.
switch_version(State = #{version := ?VER_1}) ->
State#{version := ?VER_2};
switch_version(State = #{version := ?VER_2}) ->
State#{version := ?VER_1};
switch_version(State) ->
State#{version => ?VER_2}.
authn_type(#{mechanism := Mechanism, backend := Backend}) ->
{Mechanism, Backend};
authn_type(#{mechanism := Mechanism}) ->

View File

@ -77,6 +77,8 @@
priority :: integer()
}).
-type(callback() :: #callback{}).
-record(hook, {
name :: hookpoint(),
callbacks :: list(#callback{})
@ -112,7 +114,7 @@ callback_priority(#callback{priority= P}) -> P.
%%--------------------------------------------------------------------
%% @doc Register a callback
-spec(add(hookpoint(), action() | #callback{}) -> ok_or_error(already_exists)).
-spec(add(hookpoint(), action() | callback()) -> ok_or_error(already_exists)).
add(HookPoint, Callback) when is_record(Callback, callback) ->
gen_server:call(?SERVER, {add, HookPoint, Callback}, infinity);
add(HookPoint, Action) when is_function(Action); is_tuple(Action) ->
@ -131,7 +133,7 @@ add(HookPoint, Action, Filter, Priority) when is_integer(Priority) ->
add(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}).
%% @doc Like add/2, it register a callback, discard 'already_exists' error.
-spec(put(hookpoint(), action() | #callback{}) -> ok).
-spec(put(hookpoint(), action() | callback()) -> ok).
put(HookPoint, Callback) when is_record(Callback, callback) ->
case add(HookPoint, Callback) of
ok -> ok;
@ -211,7 +213,7 @@ safe_execute({M, F, A}, Args) ->
exception => Error,
reason => Reason,
stacktrace => Stacktrace,
failed_call => {M, F, A}
failed_call => {M, F, Args ++ A}
})
end.
@ -220,7 +222,7 @@ execute({M, F, A}, Args) ->
erlang:apply(M, F, Args ++ A).
%% @doc Lookup callbacks.
-spec(lookup(hookpoint()) -> [#callback{}]).
-spec(lookup(hookpoint()) -> [callback()]).
lookup(HookPoint) ->
case ets:lookup(?TAB, HookPoint) of
[#hook{callbacks = Callbacks}] ->
@ -292,10 +294,10 @@ add_callback(C, Callbacks) ->
add_callback(C, Callbacks, []).
add_callback(C, [], Acc) ->
lists:reverse([C|Acc]);
add_callback(C1 = #callback{priority = P1}, [C2 = #callback{priority = P2}|More], Acc)
lists:reverse([C | Acc]);
add_callback(C1 = #callback{priority = P1}, [C2 = #callback{priority = P2} | More], Acc)
when P1 =< P2 ->
add_callback(C1, More, [C2|Acc]);
add_callback(C1, More, [C2 | Acc]);
add_callback(C1, More, Acc) ->
lists:append(lists:reverse(Acc), [C1 | More]).
@ -310,4 +312,3 @@ del_callback(Action = {M, F}, [#callback{action = {M, F, _A}} | Callbacks], Acc)
del_callback(Action, Callbacks, Acc);
del_callback(Action, [Callback | Callbacks], Acc) ->
del_callback(Action, Callbacks, [Callback | Acc]).

View File

@ -22,7 +22,7 @@
-include("logger.hrl").
-type(hash_type() :: plain | md5 | sha | sha256 | pbkdf2 | bcrypt).
-type(hash_type() :: plain | md5 | sha | sha256 | sha512 | pbkdf2 | bcrypt).
-export_type([hash_type/0]).
@ -95,4 +95,3 @@ hexstring(<<X:256/big-unsigned-integer>>) ->
iolist_to_binary(io_lib:format("~64.16.0b", [X]));
hexstring(<<X:512/big-unsigned-integer>>) ->
iolist_to_binary(io_lib:format("~128.16.0b", [X])).

View File

@ -28,7 +28,7 @@
-export([ roots/0, fields/1 ]).
-export([ create/1
-export([ create/2
, update/2
, authenticate/2
, destroy/1
@ -70,7 +70,7 @@ check_config(C) ->
#{atom_key => true}),
R.
create(_Config) ->
create(_AuthenticatorID, _Config) ->
{ok, #{mark => 1}}.
update(_Config, _State) ->
@ -103,7 +103,9 @@ end_per_testcase(Case, Config) ->
_ = ?MODULE:Case({'end', Config}),
ok.
t_chain({_, Config}) -> Config;
t_chain(Config) when is_list(Config) ->
% CRUD of authentication chain
ChainName = 'test',
@ -118,9 +120,11 @@ t_chain(Config) when is_list(Config) ->
?assertMatch({error, {not_found, {chain, ChainName}}}, ?AUTHN:lookup_chain(ChainName)),
ok.
t_authenticator({'init', Config}) ->
[{"auth1", {'password-based', 'built-in-database'}},
{"auth2", {'password-based', mysql}} | Config];
t_authenticator(Config) when is_list(Config) ->
ChainName = 'test',
AuthenticatorConfig1 = #{mechanism => 'password-based',
@ -128,23 +132,43 @@ t_authenticator(Config) when is_list(Config) ->
enable => true},
% Create an authenticator when the authentication chain does not exist
?assertEqual({error, {not_found, {chain, ChainName}}}, ?AUTHN:create_authenticator(ChainName, AuthenticatorConfig1)),
?assertEqual(
{error, {not_found, {chain, ChainName}}},
?AUTHN:create_authenticator(ChainName, AuthenticatorConfig1)),
?AUTHN:create_chain(ChainName),
% Create an authenticator when the provider does not exist
?assertEqual({error, no_available_provider}, ?AUTHN:create_authenticator(ChainName, AuthenticatorConfig1)),
?assertEqual(
{error, no_available_provider},
?AUTHN:create_authenticator(ChainName, AuthenticatorConfig1)),
AuthNType1 = ?config("auth1"),
register_provider(AuthNType1, ?MODULE),
ID1 = <<"password-based:built-in-database">>,
% CRUD of authencaticator
?assertMatch({ok, #{id := ID1, state := #{mark := 1, version := <<"2">>}}}, ?AUTHN:create_authenticator(ChainName, AuthenticatorConfig1)),
?assertMatch(
{ok, #{id := ID1, state := #{mark := 1}}},
?AUTHN:create_authenticator(ChainName, AuthenticatorConfig1)),
?assertMatch({ok, #{id := ID1}}, ?AUTHN:lookup_authenticator(ChainName, ID1)),
?assertMatch({ok, [#{id := ID1}]}, ?AUTHN:list_authenticators(ChainName)),
?assertEqual({error, {already_exists, {authenticator, ID1}}}, ?AUTHN:create_authenticator(ChainName, AuthenticatorConfig1)),
?assertMatch({ok, #{id := ID1, state := #{mark := 2, version := <<"1">>}}}, ?AUTHN:update_authenticator(ChainName, ID1, AuthenticatorConfig1)),
?assertEqual(
{error, {already_exists, {authenticator, ID1}}},
?AUTHN:create_authenticator(ChainName, AuthenticatorConfig1)),
?assertMatch(
{ok, #{id := ID1, state := #{mark := 2}}},
?AUTHN:update_authenticator(ChainName, ID1, AuthenticatorConfig1)),
?assertEqual(ok, ?AUTHN:delete_authenticator(ChainName, ID1)),
?assertEqual({error, {not_found, {authenticator, ID1}}}, ?AUTHN:update_authenticator(ChainName, ID1, AuthenticatorConfig1)),
?assertEqual(
{error, {not_found, {authenticator, ID1}}},
?AUTHN:update_authenticator(ChainName, ID1, AuthenticatorConfig1)),
?assertMatch({ok, []}, ?AUTHN:list_authenticators(ChainName)),
% Multiple authenticators exist at the same time
@ -154,25 +178,37 @@ t_authenticator(Config) when is_list(Config) ->
AuthenticatorConfig2 = #{mechanism => 'password-based',
backend => mysql,
enable => true},
?assertMatch({ok, #{id := ID1}}, ?AUTHN:create_authenticator(ChainName, AuthenticatorConfig1)),
?assertMatch({ok, #{id := ID2}}, ?AUTHN:create_authenticator(ChainName, AuthenticatorConfig2)),
?assertMatch(
{ok, #{id := ID1}},
?AUTHN:create_authenticator(ChainName, AuthenticatorConfig1)),
?assertMatch(
{ok, #{id := ID2}},
?AUTHN:create_authenticator(ChainName, AuthenticatorConfig2)),
% Move authenticator
?assertMatch({ok, [#{id := ID1}, #{id := ID2}]}, ?AUTHN:list_authenticators(ChainName)),
?assertEqual(ok, ?AUTHN:move_authenticator(ChainName, ID2, top)),
?assertMatch({ok, [#{id := ID2}, #{id := ID1}]}, ?AUTHN:list_authenticators(ChainName)),
?assertEqual(ok, ?AUTHN:move_authenticator(ChainName, ID2, bottom)),
?assertMatch({ok, [#{id := ID1}, #{id := ID2}]}, ?AUTHN:list_authenticators(ChainName)),
?assertEqual(ok, ?AUTHN:move_authenticator(ChainName, ID2, {before, ID1})),
?assertMatch({ok, [#{id := ID2}, #{id := ID1}]}, ?AUTHN:list_authenticators(ChainName));
t_authenticator({'end', Config}) ->
?AUTHN:delete_chain(test),
?AUTHN:deregister_providers([?config("auth1"), ?config("auth2")]),
ok.
t_authenticate({init, Config}) ->
[{listener_id, 'tcp:default'},
{authn_type, {'password-based', 'built-in-database'}} | Config];
t_authenticate(Config) when is_list(Config) ->
ListenerID = ?config(listener_id),
AuthNType = ?config(authn_type),
@ -190,13 +226,21 @@ t_authenticate(Config) when is_list(Config) ->
enable => true},
?AUTHN:create_chain(ListenerID),
?assertMatch({ok, _}, ?AUTHN:create_authenticator(ListenerID, AuthenticatorConfig)),
?assertEqual({ok, #{is_superuser => true}}, emqx_access_control:authenticate(ClientInfo)),
?assertEqual({error, bad_username_or_password}, emqx_access_control:authenticate(ClientInfo#{username => <<"bad">>}));
?assertEqual(
{ok, #{is_superuser => true}},
emqx_access_control:authenticate(ClientInfo)),
?assertEqual(
{error, bad_username_or_password},
emqx_access_control:authenticate(ClientInfo#{username => <<"bad">>}));
t_authenticate({'end', Config}) ->
?AUTHN:delete_chain(?config(listener_id)),
?AUTHN:deregister_provider(?config(authn_type)),
ok.
t_update_config({init, Config}) ->
Global = 'mqtt:global',
AuthNType1 = {'password-based', 'built-in-database'},
@ -204,6 +248,7 @@ t_update_config({init, Config}) ->
[{global, Global},
{"auth1", AuthNType1},
{"auth2", AuthNType2} | Config];
t_update_config(Config) when is_list(Config) ->
emqx_config_handler:add_handler([authentication], emqx_authentication),
ok = register_provider(?config("auth1"), ?MODULE),
@ -219,46 +264,113 @@ t_update_config(Config) when is_list(Config) ->
ID2 = <<"password-based:mysql">>,
?assertMatch({ok, []}, ?AUTHN:list_chains()),
?assertMatch({ok, _}, update_config([authentication], {create_authenticator, Global, AuthenticatorConfig1})),
?assertMatch({ok, #{id := ID1, state := #{mark := 1}}}, ?AUTHN:lookup_authenticator(Global, ID1)),
?assertMatch({ok, _}, update_config([authentication], {create_authenticator, Global, AuthenticatorConfig2})),
?assertMatch({ok, #{id := ID2, state := #{mark := 1}}}, ?AUTHN:lookup_authenticator(Global, ID2)),
?assertMatch(
{ok, _},
update_config([authentication], {create_authenticator, Global, AuthenticatorConfig1})),
?assertMatch({ok, _}, update_config([authentication], {update_authenticator, Global, ID1, AuthenticatorConfig1#{<<"enable">> => false}})),
?assertMatch({ok, #{id := ID1, state := #{mark := 2}}}, ?AUTHN:lookup_authenticator(Global, ID1)),
?assertMatch(
{ok, #{id := ID1, state := #{mark := 1}}},
?AUTHN:lookup_authenticator(Global, ID1)),
?assertMatch(
{ok, _},
update_config([authentication], {create_authenticator, Global, AuthenticatorConfig2})),
?assertMatch(
{ok, #{id := ID2, state := #{mark := 1}}},
?AUTHN:lookup_authenticator(Global, ID2)),
?assertMatch(
{ok, _},
update_config([authentication],
{update_authenticator,
Global,
ID1,
AuthenticatorConfig1#{<<"enable">> => false}
})),
?assertMatch(
{ok, #{id := ID1, state := #{mark := 2}}},
?AUTHN:lookup_authenticator(Global, ID1)),
?assertMatch(
{ok, _},
update_config([authentication], {move_authenticator, Global, ID2, top})),
?assertMatch({ok, _}, update_config([authentication], {move_authenticator, Global, ID2, top})),
?assertMatch({ok, [#{id := ID2}, #{id := ID1}]}, ?AUTHN:list_authenticators(Global)),
?assertMatch({ok, _}, update_config([authentication], {delete_authenticator, Global, ID1})),
?assertEqual({error, {not_found, {authenticator, ID1}}}, ?AUTHN:lookup_authenticator(Global, ID1)),
?assertEqual(
{error, {not_found, {authenticator, ID1}}},
?AUTHN:lookup_authenticator(Global, ID1)),
?assertMatch({ok, _}, update_config([authentication], {delete_authenticator, Global, ID2})),
?assertEqual({error, {not_found, {authenticator, ID2}}}, ?AUTHN:lookup_authenticator(Global, ID2)),
?assertMatch(
{ok, _},
update_config([authentication], {delete_authenticator, Global, ID2})),
?assertEqual(
{error, {not_found, {authenticator, ID2}}},
?AUTHN:lookup_authenticator(Global, ID2)),
ListenerID = 'tcp:default',
ConfKeyPath = [listeners, tcp, default, authentication],
?assertMatch({ok, _}, update_config(ConfKeyPath, {create_authenticator, ListenerID, AuthenticatorConfig1})),
?assertMatch({ok, #{id := ID1, state := #{mark := 1}}}, ?AUTHN:lookup_authenticator(ListenerID, ID1)),
?assertMatch({ok, _}, update_config(ConfKeyPath, {create_authenticator, ListenerID, AuthenticatorConfig2})),
?assertMatch({ok, #{id := ID2, state := #{mark := 1}}}, ?AUTHN:lookup_authenticator(ListenerID, ID2)),
?assertMatch(
{ok, _},
update_config(ConfKeyPath,
{create_authenticator, ListenerID, AuthenticatorConfig1})),
?assertMatch({ok, _}, update_config(ConfKeyPath, {update_authenticator, ListenerID, ID1, AuthenticatorConfig1#{<<"enable">> => false}})),
?assertMatch({ok, #{id := ID1, state := #{mark := 2}}}, ?AUTHN:lookup_authenticator(ListenerID, ID1)),
?assertMatch(
{ok, #{id := ID1, state := #{mark := 1}}},
?AUTHN:lookup_authenticator(ListenerID, ID1)),
?assertMatch({ok, _}, update_config(ConfKeyPath, {move_authenticator, ListenerID, ID2, top})),
?assertMatch({ok, [#{id := ID2}, #{id := ID1}]}, ?AUTHN:list_authenticators(ListenerID)),
?assertMatch(
{ok, _},
update_config(ConfKeyPath,
{create_authenticator, ListenerID, AuthenticatorConfig2})),
?assertMatch(
{ok, #{id := ID2, state := #{mark := 1}}},
?AUTHN:lookup_authenticator(ListenerID, ID2)),
?assertMatch(
{ok, _},
update_config(ConfKeyPath,
{update_authenticator,
ListenerID,
ID1,
AuthenticatorConfig1#{<<"enable">> => false}
})),
?assertMatch(
{ok, #{id := ID1, state := #{mark := 2}}},
?AUTHN:lookup_authenticator(ListenerID, ID1)),
?assertMatch(
{ok, _},
update_config(ConfKeyPath, {move_authenticator, ListenerID, ID2, top})),
?assertMatch(
{ok, [#{id := ID2}, #{id := ID1}]},
?AUTHN:list_authenticators(ListenerID)),
?assertMatch(
{ok, _},
update_config(ConfKeyPath, {delete_authenticator, ListenerID, ID1})),
?assertEqual(
{error, {not_found, {authenticator, ID1}}},
?AUTHN:lookup_authenticator(ListenerID, ID1));
?assertMatch({ok, _}, update_config(ConfKeyPath, {delete_authenticator, ListenerID, ID1})),
?assertEqual({error, {not_found, {authenticator, ID1}}}, ?AUTHN:lookup_authenticator(ListenerID, ID1));
t_update_config({'end', Config}) ->
?AUTHN:delete_chain(?config(global)),
?AUTHN:deregister_providers([?config("auth1"), ?config("auth2")]),
ok.
t_restart({'init', Config}) -> Config;
t_restart(Config) when is_list(Config) ->
?assertEqual({ok, []}, ?AUTHN:list_chain_names()),
@ -274,7 +386,9 @@ t_restart({'end', _Config}) ->
?AUTHN:delete_chain(test_chain),
ok.
t_convert_certs({_, Config}) -> Config;
t_convert_certs(Config) when is_list(Config) ->
Global = <<"mqtt:global">>,
Certs = certs([ {<<"keyfile">>, "key.pem"}
@ -288,7 +402,11 @@ t_convert_certs(Config) when is_list(Config) ->
Certs2 = certs([ {<<"keyfile">>, "key.pem"}
, {<<"certfile">>, "cert.pem"}
]),
#{<<"ssl">> := NCerts2} = convert_certs(CertsDir, #{<<"ssl">> => Certs2}, #{<<"ssl">> => NCerts}),
#{<<"ssl">> := NCerts2} = convert_certs(
CertsDir,
#{<<"ssl">> => Certs2}, #{<<"ssl">> => NCerts}),
?assertEqual(maps:get(<<"keyfile">>, NCerts), maps:get(<<"keyfile">>, NCerts2)),
?assertEqual(maps:get(<<"certfile">>, NCerts), maps:get(<<"certfile">>, NCerts2)),
@ -296,7 +414,11 @@ t_convert_certs(Config) when is_list(Config) ->
, {<<"certfile">>, "client-cert.pem"}
, {<<"cacertfile">>, "cacert.pem"}
]),
#{<<"ssl">> := NCerts3} = convert_certs(CertsDir, #{<<"ssl">> => Certs3}, #{<<"ssl">> => NCerts2}),
#{<<"ssl">> := NCerts3} = convert_certs(
CertsDir,
#{<<"ssl">> => Certs3}, #{<<"ssl">> => NCerts2}),
?assertNotEqual(maps:get(<<"keyfile">>, NCerts2), maps:get(<<"keyfile">>, NCerts3)),
?assertNotEqual(maps:get(<<"certfile">>, NCerts2), maps:get(<<"certfile">>, NCerts3)),

View File

@ -58,7 +58,8 @@ initialize() ->
chain_configs()).
deinitialize() ->
ok = ?AUTHN:deregister_providers(provider_types()).
ok = ?AUTHN:deregister_providers(provider_types()),
ok = emqx_authn_utils:cleanup_resources().
chain_configs() ->
[global_chain_config() | listener_chain_configs()].

View File

@ -23,8 +23,13 @@
, hash/4
, gen_salt/0
, bin/1
, ensure_apps_started/1
, cleanup_resources/0
, make_resource_id/1
]).
-define(RESOURCE_GROUP, <<"emqx_authn">>).
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
@ -62,22 +67,36 @@ check_password(undefined, _Selected, _State) ->
check_password(Password,
#{<<"password_hash">> := Hash},
#{password_hash_algorithm := bcrypt}) ->
case {ok, to_list(Hash)} =:= bcrypt:hashpw(Password, Hash) of
true -> ok;
false -> {error, bad_username_or_password}
case emqx_passwd:hash(bcrypt, {Hash, Password}) of
Hash -> ok;
_ ->
{error, bad_username_or_password}
end;
check_password(Password,
#{<<"password_hash">> := Hash} = Selected,
#{password_hash_algorithm := Algorithm,
salt_position := SaltPosition}) ->
Salt = maps:get(<<"salt">>, Selected, <<>>),
case Hash =:= hash(Algorithm, Password, Salt, SaltPosition) of
true -> ok;
false -> {error, bad_username_or_password}
case hash(Algorithm, Password, Salt, SaltPosition) of
Hash -> ok;
_ ->
{error, bad_username_or_password}
end.
is_superuser(Selected) ->
#{is_superuser => maps:get(<<"is_superuser">>, Selected, false)}.
is_superuser(#{<<"is_superuser">> := <<"">>}) ->
#{is_superuser => false};
is_superuser(#{<<"is_superuser">> := <<"0">>}) ->
#{is_superuser => false};
is_superuser(#{<<"is_superuser">> := _}) ->
#{is_superuser => true};
is_superuser(#{}) ->
#{is_superuser => false}.
ensure_apps_started(bcrypt) ->
{ok, _} = application:ensure_all_started(bcrypt),
ok;
ensure_apps_started(_) ->
ok.
hash(Algorithm, Password, Salt, prefix) ->
emqx_passwd:hash(Algorithm, <<Salt/binary, Password/binary>>);
@ -92,6 +111,15 @@ bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
bin(L) when is_list(L) -> list_to_binary(L);
bin(X) -> X.
cleanup_resources() ->
lists:foreach(
fun emqx_resource:remove_local/1,
emqx_resource:list_group_instances(?RESOURCE_GROUP)).
make_resource_id(Name) ->
NameBin = bin(Name),
emqx_resource:generate_id(?RESOURCE_GROUP, NameBin).
%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------
@ -100,7 +128,3 @@ convert_to_sql_param(undefined) ->
null;
convert_to_sql_param(V) ->
bin(V).
to_list(L) when is_list(L) -> L;
to_list(L) when is_binary(L) -> binary_to_list(L);
to_list(X) -> X.

View File

@ -17,6 +17,7 @@
-module(emqx_enhanced_authn_scram_mnesia).
-include("emqx_authn.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
-include_lib("typerefl/include/types.hrl").
-behaviour(hocon_schema).
@ -28,7 +29,7 @@
]).
-export([ refs/0
, create/1
, create/2
, update/2
, authenticate/2
, destroy/1
@ -46,6 +47,8 @@
-define(TAB, ?MODULE).
-define(FORMAT_FUN, {?MODULE, format_user_info}).
-type(user_group() :: binary()).
-export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}).
@ -58,6 +61,8 @@
, is_superuser
}).
-reflect_type([user_group/0]).
%%------------------------------------------------------------------------------
%% Mnesia bootstrap
%%------------------------------------------------------------------------------
@ -102,17 +107,17 @@ iteration_count(_) -> undefined.
refs() ->
[hoconsc:ref(?MODULE, config)].
create(#{ algorithm := Algorithm
, iteration_count := IterationCount
, '_unique' := Unique
}) ->
State = #{user_group => Unique,
create(AuthenticatorID,
#{algorithm := Algorithm,
iteration_count := IterationCount}) ->
State = #{user_group => AuthenticatorID,
algorithm => Algorithm,
iteration_count => IterationCount},
{ok, State}.
update(Config, #{user_group := Unique}) ->
create(Config#{'_unique' => Unique}).
update(Config, #{user_group := ID}) ->
create(ID, Config).
authenticate(#{auth_method := AuthMethod,
auth_data := AuthData,
@ -132,9 +137,12 @@ authenticate(_Credential, _State) ->
ignore.
destroy(#{user_group := UserGroup}) ->
MatchSpec = ets:fun2ms(
fun(#user_info{user_id = {Group, _}} = User) when Group =:= UserGroup ->
User
end),
trans(
fun() ->
MatchSpec = [{{user_info, {UserGroup, '_'}, '_', '_', '_', '_'}, [], ['$_']}],
ok = lists:foreach(fun(UserInfo) ->
mnesia:delete_object(?TAB, UserInfo, write)
end, mnesia:select(?TAB, MatchSpec, write))

View File

@ -30,7 +30,7 @@
]).
-export([ refs/0
, create/1
, create/2
, update/2
, authenticate/2
, destroy/1
@ -113,24 +113,25 @@ refs() ->
, hoconsc:ref(?MODULE, post)
].
create(#{ method := Method
, url := URL
, headers := Headers
, body := Body
, request_timeout := RequestTimeout
, '_unique' := Unique
} = Config) ->
create(_AuthenticatorID, Config) ->
create(Config).
create(#{method := Method,
url := URL,
headers := Headers,
body := Body,
request_timeout := RequestTimeout} = Config) ->
#{path := Path,
query := Query} = URIMap = parse_url(URL),
State = #{ method => Method
, path => Path
, base_query => cow_qs:parse_qs(list_to_binary(Query))
, headers => maps:to_list(Headers)
, body => maps:to_list(Body)
, request_timeout => RequestTimeout
, '_unique' => Unique
},
case emqx_resource:create_local(Unique,
ResourceId = emqx_authn_utils:make_resource_id(?MODULE),
State = #{method => Method,
path => Path,
base_query => cow_qs:parse_qs(list_to_binary(Query)),
headers => maps:to_list(Headers),
body => maps:to_list(Body),
request_timeout => RequestTimeout,
resource_id => ResourceId},
case emqx_resource:create_local(ResourceId,
emqx_connector_http,
Config#{base_url => maps:remove(query, URIMap),
pool_type => random}) of
@ -153,11 +154,11 @@ update(Config, State) ->
authenticate(#{auth_method := _}, _) ->
ignore;
authenticate(Credential, #{'_unique' := Unique,
authenticate(Credential, #{resource_id := ResourceId,
method := Method,
request_timeout := RequestTimeout} = State) ->
Request = generate_request(Credential, State),
case emqx_resource:query(Unique, {Method, Request, RequestTimeout}) of
case emqx_resource:query(ResourceId, {Method, Request, RequestTimeout}) of
{ok, 204, _Headers} -> {ok, #{is_superuser => false}};
{ok, 200, Headers, Body} ->
ContentType = proplists:get_value(<<"content-type">>, Headers, <<"application/json">>),
@ -171,11 +172,11 @@ authenticate(Credential, #{'_unique' := Unique,
end;
{error, Reason} ->
?SLOG(error, #{msg => "http_server_query_failed",
resource => Unique,
resource => ResourceId,
reason => Reason}),
ignore;
Other ->
Output = may_append_body(#{resource => Unique}, Other),
Output = may_append_body(#{resource => ResourceId}, Other),
case erlang:element(2, Other) of
Code5xx when Code5xx >= 500 andalso Code5xx < 600 ->
?SLOG(error, Output#{msg => "http_server_error",
@ -192,8 +193,8 @@ authenticate(Credential, #{'_unique' := Unique,
end
end.
destroy(#{'_unique' := Unique}) ->
_ = emqx_resource:remove_local(Unique),
destroy(#{resource_id := ResourceId}) ->
_ = emqx_resource:remove_local(ResourceId),
ok.
%%--------------------------------------------------------------------

View File

@ -27,7 +27,7 @@
]).
-export([ refs/0
, create/1
, create/2
, update/2
, authenticate/2
, destroy/1
@ -139,18 +139,23 @@ refs() ->
, hoconsc:ref(?MODULE, 'jwks')
].
create(_AuthenticatorID, Config) ->
create(Config).
create(#{verify_claims := VerifyClaims} = Config) ->
create2(Config#{verify_claims => handle_verify_claims(VerifyClaims)}).
update(#{use_jwks := false} = Config, #{jwk := Connector})
update(#{use_jwks := false} = Config,
#{jwk := Connector})
when is_pid(Connector) ->
_ = emqx_authn_jwks_connector:stop(Connector),
create(Config);
update(#{use_jwks := false} = Config, _) ->
update(#{use_jwks := false} = Config, _State) ->
create(Config);
update(#{use_jwks := true} = Config, #{jwk := Connector} = State)
update(#{use_jwks := true} = Config,
#{jwk := Connector} = State)
when is_pid(Connector) ->
ok = emqx_authn_jwks_connector:update(Connector, Config),
case maps:get(verify_cliams, Config, undefined) of
@ -160,7 +165,7 @@ update(#{use_jwks := true} = Config, #{jwk := Connector} = State)
{ok, State#{verify_claims => handle_verify_claims(VerifyClaims)}}
end;
update(#{use_jwks := true} = Config, _) ->
update(#{use_jwks := true} = Config, _State) ->
create(Config).
authenticate(#{auth_method := _}, _) ->

View File

@ -17,6 +17,7 @@
-module(emqx_authn_mnesia).
-include("emqx_authn.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
-include_lib("typerefl/include/types.hrl").
-behaviour(hocon_schema).
@ -28,7 +29,7 @@
]).
-export([ refs/0
, create/1
, create/2
, update/2
, authenticate/2
, destroy/1
@ -45,8 +46,7 @@
-export([format_user_info/1]).
-type user_id_type() :: clientid | username.
-type user_group() :: {binary(), binary()}.
-type user_group() :: binary().
-type user_id() :: binary().
-record(user_info,
@ -56,7 +56,7 @@
, is_superuser :: boolean()
}).
-reflect_type([ user_id_type/0 ]).
-reflect_type([user_id_type/0]).
-export([mnesia/1]).
@ -123,29 +123,28 @@ salt_rounds(_) -> undefined.
refs() ->
[hoconsc:ref(?MODULE, config)].
create(#{ user_id_type := Type
, password_hash_algorithm := #{name := bcrypt,
salt_rounds := SaltRounds}
, '_unique' := Unique
}) ->
{ok, _} = application:ensure_all_started(bcrypt),
State = #{user_group => Unique,
create(AuthenticatorID,
#{user_id_type := Type,
password_hash_algorithm := #{name := bcrypt,
salt_rounds := SaltRounds}}) ->
ok = emqx_authn_utils:ensure_apps_started(bcrypt),
State = #{user_group => AuthenticatorID,
user_id_type => Type,
password_hash_algorithm => bcrypt,
salt_rounds => SaltRounds},
{ok, State};
create(#{ user_id_type := Type
, password_hash_algorithm := #{name := Name}
, '_unique' := Unique
}) ->
State = #{user_group => Unique,
create(AuthenticatorID,
#{user_id_type := Type,
password_hash_algorithm := #{name := Name}}) ->
ok = emqx_authn_utils:ensure_apps_started(Name),
State = #{user_group => AuthenticatorID,
user_id_type => Type,
password_hash_algorithm => Name},
{ok, State}.
update(Config, #{user_group := Unique}) ->
create(Config#{'_unique' => Unique}).
update(Config, #{user_group := ID}) ->
create(ID, Config).
authenticate(#{auth_method := _}, _) ->
ignore;
@ -170,10 +169,14 @@ authenticate(#{password := Password} = Credential,
destroy(#{user_group := UserGroup}) ->
trans(
fun() ->
MatchSpec = [{{user_info, {UserGroup, '_'}, '_', '_', '_'}, [], ['$_']}],
ok = lists:foreach(fun delete_user2/1, mnesia:select(?TAB, MatchSpec, write))
end).
fun() ->
ok = lists:foreach(
fun(User) ->
mnesia:delete_object(?TAB, User, write)
end,
mnesia:select(?TAB, group_match_spec(UserGroup), write))
end).
import_users(Filename0, State) ->
Filename = to_binary(Filename0),
@ -246,8 +249,7 @@ lookup_user(UserID, #{user_group := UserGroup}) ->
end.
list_users(PageParams, #{user_group := UserGroup}) ->
MatchSpec = [{{user_info, {UserGroup, '_'}, '_', '_', '_'}, [], ['$_']}],
{ok, emqx_mgmt_api:paginate(?TAB, MatchSpec, PageParams, ?FORMAT_FUN)}.
{ok, emqx_mgmt_api:paginate(?TAB, group_match_spec(UserGroup), PageParams, ?FORMAT_FUN)}.
%%------------------------------------------------------------------------------
%% Internal functions
@ -374,9 +376,6 @@ insert_user(UserGroup, UserID, PasswordHash, Salt, IsSuperuser) ->
is_superuser = IsSuperuser},
mnesia:write(?TAB, UserInfo, write).
delete_user2(UserInfo) ->
mnesia:delete_object(?TAB, UserInfo, write).
%% TODO: Support other type
get_user_identity(#{username := Username}, username) ->
Username;
@ -401,3 +400,9 @@ to_binary(L) when is_list(L) ->
format_user_info(#user_info{user_id = {_, UserID}, is_superuser = IsSuperuser}) ->
#{user_id => UserID, is_superuser => IsSuperuser}.
group_match_spec(UserGroup) ->
ets:fun2ms(
fun(#user_info{user_id = {Group, _}} = User) when Group =:= UserGroup ->
User
end).

View File

@ -29,7 +29,7 @@
]).
-export([ refs/0
, create/1
, create/2
, update/2
, authenticate/2
, destroy/1
@ -102,19 +102,24 @@ refs() ->
, hoconsc:ref(?MODULE, 'sharded-cluster')
].
create(#{ selector := Selector
, '_unique' := Unique
} = Config) ->
create(_AuthenticatorID, Config) ->
create(Config).
create(#{selector := Selector} = Config) ->
NSelector = parse_selector(Selector),
State = maps:with([ collection
, password_hash_field
, salt_field
, is_superuser_field
, password_hash_algorithm
, salt_position
, '_unique'], Config),
NState = State#{selector => NSelector},
case emqx_resource:create_local(Unique, emqx_connector_mongo, Config) of
State = maps:with(
[collection,
password_hash_field,
salt_field,
is_superuser_field,
password_hash_algorithm,
salt_position],
Config),
ResourceId = emqx_authn_utils:make_resource_id(?MODULE),
NState = State#{
selector => NSelector,
resource_id => ResourceId},
case emqx_resource:create_local(ResourceId, emqx_connector_mongo, Config) of
{ok, already_created} ->
{ok, NState};
{ok, _} ->
@ -135,17 +140,16 @@ update(Config, State) ->
authenticate(#{auth_method := _}, _) ->
ignore;
authenticate(#{password := Password} = Credential,
#{ collection := Collection
, selector := Selector0
, '_unique' := Unique
} = State) ->
#{collection := Collection,
selector := Selector0,
resource_id := ResourceId} = State) ->
Selector1 = replace_placeholders(Selector0, Credential),
Selector2 = normalize_selector(Selector1),
case emqx_resource:query(Unique, {find_one, Collection, Selector2, #{}}) of
case emqx_resource:query(ResourceId, {find_one, Collection, Selector2, #{}}) of
undefined -> ignore;
{error, Reason} ->
?SLOG(error, #{msg => "mongodb_query_failed",
resource => Unique,
resource => ResourceId,
reason => Reason}),
ignore;
Doc ->
@ -154,7 +158,7 @@ authenticate(#{password := Password} = Credential,
{ok, #{is_superuser => is_superuser(Doc, State)}};
{error, {cannot_find_password_hash_field, PasswordHashField}} ->
?SLOG(error, #{msg => "cannot_find_password_hash_field",
resource => Unique,
resource => ResourceId,
password_hash_field => PasswordHashField}),
ignore;
{error, Reason} ->
@ -162,8 +166,8 @@ authenticate(#{password := Password} = Credential,
end
end.
destroy(#{'_unique' := Unique}) ->
_ = emqx_resource:remove_local(Unique),
destroy(#{resource_id := ResourceId}) ->
_ = emqx_resource:remove_local(ResourceId),
ok.
%%------------------------------------------------------------------------------

View File

@ -29,7 +29,7 @@
]).
-export([ refs/0
, create/1
, create/2
, update/2
, authenticate/2
, destroy/1
@ -76,20 +76,23 @@ query_timeout(_) -> undefined.
refs() ->
[hoconsc:ref(?MODULE, config)].
create(#{ password_hash_algorithm := Algorithm
, salt_position := SaltPosition
, query := Query0
, query_timeout := QueryTimeout
, '_unique' := Unique
create(_AuthenticatorID, Config) ->
create(Config).
create(#{password_hash_algorithm := Algorithm,
salt_position := SaltPosition,
query := Query0,
query_timeout := QueryTimeout
} = Config) ->
{Query, PlaceHolders} = parse_query(Query0),
ResourceId = emqx_authn_utils:make_resource_id(?MODULE),
State = #{password_hash_algorithm => Algorithm,
salt_position => SaltPosition,
query => Query,
placeholders => PlaceHolders,
query_timeout => QueryTimeout,
'_unique' => Unique},
case emqx_resource:create_local(Unique, emqx_connector_mysql, Config) of
resource_id => ResourceId},
case emqx_resource:create_local(ResourceId, emqx_connector_mysql, Config) of
{ok, already_created} ->
{ok, State};
{ok, _} ->
@ -113,9 +116,9 @@ authenticate(#{password := Password} = Credential,
#{placeholders := PlaceHolders,
query := Query,
query_timeout := Timeout,
'_unique' := Unique} = State) ->
resource_id := ResourceId} = State) ->
Params = emqx_authn_utils:replace_placeholders(PlaceHolders, Credential),
case emqx_resource:query(Unique, {sql, Query, Params, Timeout}) of
case emqx_resource:query(ResourceId, {sql, Query, Params, Timeout}) of
{ok, _Columns, []} -> ignore;
{ok, Columns, [Row | _]} ->
Selected = maps:from_list(lists:zip(Columns, Row)),
@ -127,13 +130,13 @@ authenticate(#{password := Password} = Credential,
end;
{error, Reason} ->
?SLOG(error, #{msg => "mysql_query_failed",
resource => Unique,
resource => ResourceId,
reason => Reason}),
ignore
end.
destroy(#{'_unique' := Unique}) ->
_ = emqx_resource:remove_local(Unique),
destroy(#{resource_id := ResourceId}) ->
_ = emqx_resource:remove_local(ResourceId),
ok.
%%------------------------------------------------------------------------------

View File

@ -30,7 +30,7 @@
]).
-export([ refs/0
, create/1
, create/2
, update/2
, authenticate/2
, destroy/1
@ -77,18 +77,20 @@ query(_) -> undefined.
refs() ->
[hoconsc:ref(?MODULE, config)].
create(#{ query := Query0
, password_hash_algorithm := Algorithm
, salt_position := SaltPosition
, '_unique' := Unique
} = Config) ->
create(_AuthenticatorID, Config) ->
create(Config).
create(#{query := Query0,
password_hash_algorithm := Algorithm,
salt_position := SaltPosition} = Config) ->
{Query, PlaceHolders} = parse_query(Query0),
ResourceId = emqx_authn_utils:make_resource_id(?MODULE),
State = #{query => Query,
placeholders => PlaceHolders,
password_hash_algorithm => Algorithm,
salt_position => SaltPosition,
'_unique' => Unique},
case emqx_resource:create_local(Unique, emqx_connector_pgsql, Config) of
resource_id => ResourceId},
case emqx_resource:create_local(ResourceId, emqx_connector_pgsql, Config) of
{ok, already_created} ->
{ok, State};
{ok, _} ->
@ -111,9 +113,9 @@ authenticate(#{auth_method := _}, _) ->
authenticate(#{password := Password} = Credential,
#{query := Query,
placeholders := PlaceHolders,
'_unique' := Unique} = State) ->
resource_id := ResourceId} = State) ->
Params = emqx_authn_utils:replace_placeholders(PlaceHolders, Credential),
case emqx_resource:query(Unique, {sql, Query, Params}) of
case emqx_resource:query(ResourceId, {sql, Query, Params}) of
{ok, _Columns, []} -> ignore;
{ok, Columns, [Row | _]} ->
NColumns = [Name || #column{name = Name} <- Columns],
@ -126,13 +128,13 @@ authenticate(#{password := Password} = Credential,
end;
{error, Reason} ->
?SLOG(error, #{msg => "postgresql_query_failed",
resource => Unique,
resource => ResourceId,
reason => Reason}),
ignore
end.
destroy(#{'_unique' := Unique}) ->
_ = emqx_resource:remove_local(Unique),
destroy(#{resource_id := ResourceId}) ->
_ = emqx_resource:remove_local(ResourceId),
ok.
%%------------------------------------------------------------------------------

View File

@ -29,7 +29,7 @@
]).
-export([ refs/0
, create/1
, create/2
, update/2
, authenticate/2
, destroy/1
@ -56,11 +56,11 @@ fields(sentinel) ->
common_fields() ++ emqx_connector_redis:fields(sentinel).
common_fields() ->
[ {mechanism, {enum, ['password-based']}}
, {backend, {enum, [redis]}}
, {query, fun query/1}
, {password_hash_algorithm, fun password_hash_algorithm/1}
, {salt_position, fun salt_position/1}
[{mechanism, {enum, ['password-based']}},
{backend, {enum, [redis]}},
{query, fun query/1},
{password_hash_algorithm, fun password_hash_algorithm/1},
{salt_position, fun salt_position/1}
] ++ emqx_authn_schema:common_fields().
query(type) -> string();
@ -84,16 +84,22 @@ refs() ->
, hoconsc:ref(?MODULE, sentinel)
].
create(#{ query := Query
, '_unique' := Unique
} = Config) ->
create(_AuthenticatorID, Config) ->
create(Config).
create(#{query := Query,
password_hash_algorithm := Algorithm} = Config) ->
try
NQuery = parse_query(Query),
State = maps:with([ password_hash_algorithm
, salt_position
, '_unique'], Config),
NState = State#{query => NQuery},
case emqx_resource:create_local(Unique, emqx_connector_redis, Config) of
ok = emqx_authn_utils:ensure_apps_started(Algorithm),
State = maps:with(
[password_hash_algorithm, salt_position],
Config),
ResourceId = emqx_authn_utils:make_resource_id(?MODULE),
NState = State#{
query => NQuery,
resource_id => ResourceId},
case emqx_resource:create_local(ResourceId, emqx_connector_redis, Config) of
{ok, already_created} ->
{ok, NState};
{ok, _} ->
@ -102,12 +108,12 @@ create(#{ query := Query
{error, Reason}
end
catch
error:{unsupported_query, Query} ->
error:{unsupported_query, _Query} ->
{error, {unsupported_query, Query}};
error:missing_password_hash ->
{error, missing_password_hash};
error:{unsupported_field, Field} ->
{error, {unsupported_field, Field}}
error:{unsupported_fields, Fields} ->
{error, {unsupported_fields, Fields}}
end.
update(Config, State) ->
@ -122,11 +128,10 @@ update(Config, State) ->
authenticate(#{auth_method := _}, _) ->
ignore;
authenticate(#{password := Password} = Credential,
#{ query := {Command, Key, Fields}
, '_unique' := Unique
} = State) ->
#{query := {Command, Key, Fields},
resource_id := ResourceId} = State) ->
NKey = binary_to_list(iolist_to_binary(replace_placeholders(Key, Credential))),
case emqx_resource:query(Unique, {cmd, [Command, NKey | Fields]}) of
case emqx_resource:query(ResourceId, {cmd, [Command, NKey | Fields]}) of
{ok, Values} ->
case merge(Fields, Values) of
#{<<"password_hash">> := _} = Selected ->
@ -138,18 +143,18 @@ authenticate(#{password := Password} = Credential,
end;
_ ->
?SLOG(error, #{msg => "cannot_find_password_hash_field",
resource => Unique}),
resource => ResourceId}),
ignore
end;
{error, Reason} ->
?SLOG(error, #{msg => "redis_query_failed",
resource => Unique,
resource => ResourceId,
reason => Reason}),
ignore
end.
destroy(#{'_unique' := Unique}) ->
_ = emqx_resource:remove_local(Unique),
destroy(#{resource_id := ResourceId}) ->
_ = emqx_resource:remove_local(ResourceId),
ok.
%%------------------------------------------------------------------------------
@ -169,20 +174,15 @@ parse_query(Query) ->
end.
check_fields(Fields) ->
check_fields(Fields, false).
HasPassHash = lists:member("password_hash", Fields),
KnownFields = ["password_hash", "salt", "is_superuser"],
UnknownFields = [F || F <- Fields, not lists:member(F, KnownFields)],
check_fields([], false) ->
error(missing_password_hash);
check_fields([], true) ->
ok;
check_fields(["password_hash" | More], false) ->
check_fields(More, true);
check_fields(["salt" | More], HasPassHash) ->
check_fields(More, HasPassHash);
check_fields(["is_superuser" | More], HasPassHash) ->
check_fields(More, HasPassHash);
check_fields([Field | _], _) ->
error({unsupported_field, Field}).
case {HasPassHash, UnknownFields} of
{true, []} -> ok;
{true, _} -> error({unsupported_fields, UnknownFields});
{false, _} -> error(missing_password_hash)
end.
parse_key(Key) ->
Tokens = re:split(Key, "(" ++ ?RE_PLACEHOLDER ++ ")", [{return, binary}, group, trim]),

View File

@ -0,0 +1,20 @@
-----BEGIN CERTIFICATE-----
MIIDUTCCAjmgAwIBAgIJAPPYCjTmxdt/MA0GCSqGSIb3DQEBCwUAMD8xCzAJBgNV
BAYTAkNOMREwDwYDVQQIDAhoYW5nemhvdTEMMAoGA1UECgwDRU1RMQ8wDQYDVQQD
DAZSb290Q0EwHhcNMjAwNTA4MDgwNjUyWhcNMzAwNTA2MDgwNjUyWjA/MQswCQYD
VQQGEwJDTjERMA8GA1UECAwIaGFuZ3pob3UxDDAKBgNVBAoMA0VNUTEPMA0GA1UE
AwwGUm9vdENBMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAzcgVLex1
EZ9ON64EX8v+wcSjzOZpiEOsAOuSXOEN3wb8FKUxCdsGrsJYB7a5VM/Jot25Mod2
juS3OBMg6r85k2TWjdxUoUs+HiUB/pP/ARaaW6VntpAEokpij/przWMPgJnBF3Ur
MjtbLayH9hGmpQrI5c2vmHQ2reRZnSFbY+2b8SXZ+3lZZgz9+BaQYWdQWfaUWEHZ
uDaNiViVO0OT8DRjCuiDp3yYDj3iLWbTA/gDL6Tf5XuHuEwcOQUrd+h0hyIphO8D
tsrsHZ14j4AWYLk1CPA6pq1HIUvEl2rANx2lVUNv+nt64K/Mr3RnVQd9s8bK+TXQ
KGHd2Lv/PALYuwIDAQABo1AwTjAdBgNVHQ4EFgQUGBmW+iDzxctWAWxmhgdlE8Pj
EbQwHwYDVR0jBBgwFoAUGBmW+iDzxctWAWxmhgdlE8PjEbQwDAYDVR0TBAUwAwEB
/zANBgkqhkiG9w0BAQsFAAOCAQEAGbhRUjpIred4cFAFJ7bbYD9hKu/yzWPWkMRa
ErlCKHmuYsYk+5d16JQhJaFy6MGXfLgo3KV2itl0d+OWNH0U9ULXcglTxy6+njo5
CFqdUBPwN1jxhzo9yteDMKF4+AHIxbvCAJa17qcwUKR5MKNvv09C6pvQDJLzid7y
E2dkgSuggik3oa0427KvctFf8uhOV94RvEDyqvT5+pgNYZ2Yfga9pD/jjpoHEUlo
88IGU8/wJCx3Ds2yc8+oBg/ynxG8f/HmCC1ET6EHHoe2jlo8FpU/SgGtghS1YL30
IWxNsPrUP+XsZpBJy/mvOhE5QXo6Y35zDqqj8tI7AGmAWu22jg==
-----END CERTIFICATE-----

View File

@ -0,0 +1,19 @@
-----BEGIN CERTIFICATE-----
MIIDEzCCAfugAwIBAgIBAjANBgkqhkiG9w0BAQsFADA/MQswCQYDVQQGEwJDTjER
MA8GA1UECAwIaGFuZ3pob3UxDDAKBgNVBAoMA0VNUTEPMA0GA1UEAwwGUm9vdENB
MB4XDTIwMDUwODA4MDcwNVoXDTMwMDUwNjA4MDcwNVowPzELMAkGA1UEBhMCQ04x
ETAPBgNVBAgMCGhhbmd6aG91MQwwCgYDVQQKDANFTVExDzANBgNVBAMMBlNlcnZl
cjCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBALNeWT3pE+QFfiRJzKmn
AMUrWo3K2j/Tm3+Xnl6WLz67/0rcYrJbbKvS3uyRP/stXyXEKw9CepyQ1ViBVFkW
Aoy8qQEOWFDsZc/5UzhXUnb6LXr3qTkFEjNmhj+7uzv/lbBxlUG1NlYzSeOB6/RT
8zH/lhOeKhLnWYPXdXKsa1FL6ij4X8DeDO1kY7fvAGmBn/THh1uTpDizM4YmeI+7
4dmayA5xXvARte5h4Vu5SIze7iC057N+vymToMk2Jgk+ZZFpyXrnq+yo6RaD3ANc
lrc4FbeUQZ5a5s5Sxgs9a0Y3WMG+7c5VnVXcbjBRz/aq2NtOnQQjikKKQA8GF080
BQkCAwEAAaMaMBgwCQYDVR0TBAIwADALBgNVHQ8EBAMCBeAwDQYJKoZIhvcNAQEL
BQADggEBAJefnMZpaRDHQSNUIEL3iwGXE9c6PmIsQVE2ustr+CakBp3TZ4l0enLt
iGMfEVFju69cO4oyokWv+hl5eCMkHBf14Kv51vj448jowYnF1zmzn7SEzm5Uzlsa
sqjtAprnLyof69WtLU1j5rYWBuFX86yOTwRAFNjm9fvhAcrEONBsQtqipBWkMROp
iUYMkRqbKcQMdwxov+lHBYKq9zbWRoqLROAn54SRqgQk6c15JdEfgOOjShbsOkIH
UhqcwRkQic7n1zwHVGVDgNIZVgmJ2IdIWBlPEC7oLrRrBD/X1iEEXtKab6p5o22n
KB5mN+iQaE+Oe2cpGKZJiJRdM+IqDDQ=
-----END CERTIFICATE-----

View File

@ -0,0 +1,19 @@
-----BEGIN CERTIFICATE-----
MIIDEzCCAfugAwIBAgIBATANBgkqhkiG9w0BAQsFADA/MQswCQYDVQQGEwJDTjER
MA8GA1UECAwIaGFuZ3pob3UxDDAKBgNVBAoMA0VNUTEPMA0GA1UEAwwGUm9vdENB
MB4XDTIwMDUwODA4MDY1N1oXDTMwMDUwNjA4MDY1N1owPzELMAkGA1UEBhMCQ04x
ETAPBgNVBAgMCGhhbmd6aG91MQwwCgYDVQQKDANFTVExDzANBgNVBAMMBkNsaWVu
dDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMy4hoksKcZBDbY680u6
TS25U51nuB1FBcGMlF9B/t057wPOlxF/OcmbxY5MwepS41JDGPgulE1V7fpsXkiW
1LUimYV/tsqBfymIe0mlY7oORahKji7zKQ2UBIVFhdlvQxunlIDnw6F9popUgyHt
dMhtlgZK8oqRwHxO5dbfoukYd6J/r+etS5q26sgVkf3C6dt0Td7B25H9qW+f7oLV
PbcHYCa+i73u9670nrpXsC+Qc7Mygwa2Kq/jwU+ftyLQnOeW07DuzOwsziC/fQZa
nbxR+8U9FNftgRcC3uP/JMKYUqsiRAuaDokARZxVTV5hUElfpO6z6/NItSDvvh3i
eikCAwEAAaMaMBgwCQYDVR0TBAIwADALBgNVHQ8EBAMCBeAwDQYJKoZIhvcNAQEL
BQADggEBABchYxKo0YMma7g1qDswJXsR5s56Czx/I+B41YcpMBMTrRqpUC0nHtLk
M7/tZp592u/tT8gzEnQjZLKBAhFeZaR3aaKyknLqwiPqJIgg0pgsBGITrAK3Pv4z
5/YvAJJKgTe5UdeTz6U4lvNEux/4juZ4pmqH4qSFJTOzQS7LmgSmNIdd072rwXBd
UzcSHzsJgEMb88u/LDLjj1pQ7AtZ4Tta8JZTvcgBFmjB0QUi6fgkHY6oGat/W4kR
jSRUBlMUbM/drr2PVzRc2dwbFIl3X+ZE6n5Sl3ZwRAC/s92JU6CPMRW02muVu6xl
goraNgPISnrbpR6KjxLZkVembXzjNNc=
-----END CERTIFICATE-----

View File

@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAzLiGiSwpxkENtjrzS7pNLblTnWe4HUUFwYyUX0H+3TnvA86X
EX85yZvFjkzB6lLjUkMY+C6UTVXt+mxeSJbUtSKZhX+2yoF/KYh7SaVjug5FqEqO
LvMpDZQEhUWF2W9DG6eUgOfDoX2milSDIe10yG2WBkryipHAfE7l1t+i6Rh3on+v
561LmrbqyBWR/cLp23RN3sHbkf2pb5/ugtU9twdgJr6Lve73rvSeulewL5BzszKD
BrYqr+PBT5+3ItCc55bTsO7M7CzOIL99BlqdvFH7xT0U1+2BFwLe4/8kwphSqyJE
C5oOiQBFnFVNXmFQSV+k7rPr80i1IO++HeJ6KQIDAQABAoIBAGWgvPjfuaU3qizq
uti/FY07USz0zkuJdkANH6LiSjlchzDmn8wJ0pApCjuIE0PV/g9aS8z4opp5q/gD
UBLM/a8mC/xf2EhTXOMrY7i9p/I3H5FZ4ZehEqIw9sWKK9YzC6dw26HabB2BGOnW
5nozPSQ6cp2RGzJ7BIkxSZwPzPnVTgy3OAuPOiJytvK+hGLhsNaT+Y9bNDvplVT2
ZwYTV8GlHZC+4b2wNROILm0O86v96O+Qd8nn3fXjGHbMsAnONBq10bZS16L4fvkH
5G+W/1PeSXmtZFppdRRDxIW+DWcXK0D48WRliuxcV4eOOxI+a9N2ZJZZiNLQZGwg
w3A8+mECgYEA8HuJFrlRvdoBe2U/EwUtG74dcyy30L4yEBnN5QscXmEEikhaQCfX
Wm6EieMcIB/5I5TQmSw0cmBMeZjSXYoFdoI16/X6yMMuATdxpvhOZGdUGXxhAH+x
xoTUavWZnEqW3fkUU71kT5E2f2i+0zoatFESXHeslJyz85aAYpP92H0CgYEA2e5A
Yozt5eaA1Gyhd8SeptkEU4xPirNUnVQHStpMWUb1kzTNXrPmNWccQ7JpfpG6DcYl
zUF6p6mlzY+zkMiyPQjwEJlhiHM2NlL1QS7td0R8ewgsFoyn8WsBI4RejWrEG9td
EDniuIw+pBFkcWthnTLHwECHdzgquToyTMjrBB0CgYEA28tdGbrZXhcyAZEhHAZA
Gzog+pKlkpEzeonLKIuGKzCrEKRecIK5jrqyQsCjhS0T7ZRnL4g6i0s+umiV5M5w
fcc292pEA1h45L3DD6OlKplSQVTv55/OYS4oY3YEJtf5mfm8vWi9lQeY8sxOlQpn
O+VZTdBHmTC8PGeTAgZXHZUCgYA6Tyv88lYowB7SN2qQgBQu8jvdGtqhcs/99GCr
H3N0I69LPsKAR0QeH8OJPXBKhDUywESXAaEOwS5yrLNP1tMRz5Vj65YUCzeDG3kx
gpvY4IMp7ArX0bSRvJ6mYSFnVxy3k174G3TVCfksrtagHioVBGQ7xUg5ltafjrms
n8l55QKBgQDVzU8tQvBVqY8/1lnw11Vj4fkE/drZHJ5UkdC1eenOfSWhlSLfUJ8j
ds7vEWpRPPoVuPZYeR1y78cyxKe1GBx6Wa2lF5c7xjmiu0xbRnrxYeLolce9/ntp
asClqpnHT8/VJYTD7Kqj0fouTTZf0zkig/y+2XERppd8k+pSKjUCPQ==
-----END RSA PRIVATE KEY-----

View File

@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEowIBAAKCAQEAs15ZPekT5AV+JEnMqacAxStajcraP9Obf5eeXpYvPrv/Stxi
sltsq9Le7JE/+y1fJcQrD0J6nJDVWIFUWRYCjLypAQ5YUOxlz/lTOFdSdvotevep
OQUSM2aGP7u7O/+VsHGVQbU2VjNJ44Hr9FPzMf+WE54qEudZg9d1cqxrUUvqKPhf
wN4M7WRjt+8AaYGf9MeHW5OkOLMzhiZ4j7vh2ZrIDnFe8BG17mHhW7lIjN7uILTn
s36/KZOgyTYmCT5lkWnJeuer7KjpFoPcA1yWtzgVt5RBnlrmzlLGCz1rRjdYwb7t
zlWdVdxuMFHP9qrY206dBCOKQopADwYXTzQFCQIDAQABAoIBAQCuvCbr7Pd3lvI/
n7VFQG+7pHRe1VKwAxDkx2t8cYos7y/QWcm8Ptwqtw58HzPZGWYrgGMCRpzzkRSF
V9g3wP1S5Scu5C6dBu5YIGc157tqNGXB+SpdZddJQ4Nc6yGHXYERllT04ffBGc3N
WG/oYS/1cSteiSIrsDy/91FvGRCi7FPxH3wIgHssY/tw69s1Cfvaq5lr2NTFzxIG
xCvpJKEdSfVfS9I7LYiymVjst3IOR/w76/ZFY9cRa8ZtmQSWWsm0TUpRC1jdcbkm
ZoJptYWlP+gSwx/fpMYftrkJFGOJhHJHQhwxT5X/ajAISeqjjwkWSEJLwnHQd11C
Zy2+29lBAoGBANlEAIK4VxCqyPXNKfoOOi5dS64NfvyH4A1v2+KaHWc7lqaqPN49
ezfN2n3X+KWx4cviDD914Yc2JQ1vVJjSaHci7yivocDo2OfZDmjBqzaMp/y+rX1R
/f3MmiTqMa468rjaxI9RRZu7vDgpTR+za1+OBCgMzjvAng8dJuN/5gjlAoGBANNY
uYPKtearBmkqdrSV7eTUe49Nhr0XotLaVBH37TCW0Xv9wjO2xmbm5Ga/DCtPIsBb
yPeYwX9FjoasuadUD7hRvbFu6dBa0HGLmkXRJZTcD7MEX2Lhu4BuC72yDLLFd0r+
Ep9WP7F5iJyagYqIZtz+4uf7gBvUDdmvXz3sGr1VAoGAdXTD6eeKeiI6PlhKBztF
zOb3EQOO0SsLv3fnodu7ZaHbUgLaoTMPuB17r2jgrYM7FKQCBxTNdfGZmmfDjlLB
0xZ5wL8ibU30ZXL8zTlWPElST9sto4B+FYVVF/vcG9sWeUUb2ncPcJ/Po3UAktDG
jYQTTyuNGtSJHpad/YOZctkCgYBtWRaC7bq3of0rJGFOhdQT9SwItN/lrfj8hyHA
OjpqTV4NfPmhsAtu6j96OZaeQc+FHvgXwt06cE6Rt4RG4uNPRluTFgO7XYFDfitP
vCppnoIw6S5BBvHwPP+uIhUX2bsi/dm8vu8tb+gSvo4PkwtFhEr6I9HglBKmcmog
q6waEQKBgHyecFBeM6Ls11Cd64vborwJPAuxIW7HBAFj/BS99oeG4TjBx4Sz2dFd
rzUibJt4ndnHIvCN8JQkjNG14i9hJln+H3mRss8fbZ9vQdqG+2vOWADYSzzsNI55
RFY7JjluKcVkp/zCDeUxTU3O6sS+v6/3VE11Cob6OYQx3lN5wrZ3
-----END RSA PRIVATE KEY-----

View File

@ -0,0 +1,134 @@
## create emqx.io
dn:dc=emqx,dc=io
objectclass: top
objectclass: dcobject
objectclass: organization
dc:emqx
o:emqx,Inc.
# create testdevice.emqx.io
dn:ou=testdevice,dc=emqx,dc=io
objectClass: top
objectclass:organizationalUnit
ou:testdevice
# create user admin
dn:uid=admin,ou=testdevice,dc=emqx,dc=io
objectClass: top
objectClass: simpleSecurityObject
objectClass: account
userPassword:: e1NIQX1XNnBoNU1tNVB6OEdnaVVMYlBnekczN21qOWc9
uid: admin
## create user=mqttuser0001,
# password=mqttuser0001,
# passhash={SHA}mlb3fat40MKBTXUVZwCKmL73R/0=
# base64passhash=e1NIQX1tbGIzZmF0NDBNS0JUWFVWWndDS21MNzNSLzA9
dn:uid=mqttuser0001,ou=testdevice,dc=emqx,dc=io
objectClass: top
objectClass: mqttUser
objectClass: mqttDevice
objectClass: mqttSecurity
uid: mqttuser0001
isEnabled: TRUE
mqttAccountName: user1
mqttPublishTopic: mqttuser0001/pub/1
mqttPublishTopic: mqttuser0001/pub/+
mqttPublishTopic: mqttuser0001/pub/#
mqttSubscriptionTopic: mqttuser0001/sub/1
mqttSubscriptionTopic: mqttuser0001/sub/+
mqttSubscriptionTopic: mqttuser0001/sub/#
mqttPubSubTopic: mqttuser0001/pubsub/1
mqttPubSubTopic: mqttuser0001/pubsub/+
mqttPubSubTopic: mqttuser0001/pubsub/#
userPassword:: e1NIQX1tbGIzZmF0NDBNS0JUWFVWWndDS21MNzNSLzA9
## create user=mqttuser0002
# password=mqttuser0002,
# passhash={SSHA}n9XdtoG4Q/TQ3TQF4Y+khJbMBH4qXj4M
# base64passhash=e1NTSEF9bjlYZHRvRzRRL1RRM1RRRjRZK2toSmJNQkg0cVhqNE0=
dn:uid=mqttuser0002,ou=testdevice,dc=emqx,dc=io
objectClass: top
objectClass: mqttUser
objectClass: mqttDevice
objectClass: mqttSecurity
uid: mqttuser0002
isEnabled: TRUE
mqttAccountName: user2
mqttPublishTopic: mqttuser0002/pub/1
mqttPublishTopic: mqttuser0002/pub/+
mqttPublishTopic: mqttuser0002/pub/#
mqttSubscriptionTopic: mqttuser0002/sub/1
mqttSubscriptionTopic: mqttuser0002/sub/+
mqttSubscriptionTopic: mqttuser0002/sub/#
mqttPubSubTopic: mqttuser0002/pubsub/1
mqttPubSubTopic: mqttuser0002/pubsub/+
mqttPubSubTopic: mqttuser0002/pubsub/#
userPassword:: e1NTSEF9bjlYZHRvRzRRL1RRM1RRRjRZK2toSmJNQkg0cVhqNE0=
## create user mqttuser0003
# password=mqttuser0003,
# passhash={MD5}ybsPGoaK3nDyiQvveiCOIw==
# base64passhash=e01ENX15YnNQR29hSzNuRHlpUXZ2ZWlDT0l3PT0=
dn:uid=mqttuser0003,ou=testdevice,dc=emqx,dc=io
objectClass: top
objectClass: mqttUser
objectClass: mqttDevice
objectClass: mqttSecurity
uid: mqttuser0003
isEnabled: TRUE
mqttPublishTopic: mqttuser0003/pub/1
mqttPublishTopic: mqttuser0003/pub/+
mqttPublishTopic: mqttuser0003/pub/#
mqttSubscriptionTopic: mqttuser0003/sub/1
mqttSubscriptionTopic: mqttuser0003/sub/+
mqttSubscriptionTopic: mqttuser0003/sub/#
mqttPubSubTopic: mqttuser0003/pubsub/1
mqttPubSubTopic: mqttuser0003/pubsub/+
mqttPubSubTopic: mqttuser0003/pubsub/#
userPassword:: e01ENX15YnNQR29hSzNuRHlpUXZ2ZWlDT0l3PT0=
## create user mqttuser0004
# password=mqttuser0004,
# passhash={MD5}2Br6pPDSEDIEvUlu9+s+MA==
# base64passhash=e01ENX0yQnI2cFBEU0VESUV2VWx1OStzK01BPT0=
dn:uid=mqttuser0004,ou=testdevice,dc=emqx,dc=io
objectClass: top
objectClass: mqttUser
objectClass: mqttDevice
objectClass: mqttSecurity
uid: mqttuser0004
isEnabled: TRUE
mqttPublishTopic: mqttuser0004/pub/1
mqttPublishTopic: mqttuser0004/pub/+
mqttPublishTopic: mqttuser0004/pub/#
mqttSubscriptionTopic: mqttuser0004/sub/1
mqttSubscriptionTopic: mqttuser0004/sub/+
mqttSubscriptionTopic: mqttuser0004/sub/#
mqttPubSubTopic: mqttuser0004/pubsub/1
mqttPubSubTopic: mqttuser0004/pubsub/+
mqttPubSubTopic: mqttuser0004/pubsub/#
userPassword: {MD5}2Br6pPDSEDIEvUlu9+s+MA==
## create user mqttuser0005
# password=mqttuser0005,
# passhash={SHA}jKnxeEDGR14kE8AR7yuVFOelhz4=
# base64passhash=e1NIQX1qS254ZUVER1IxNGtFOEFSN3l1VkZPZWxoejQ9
objectClass: top
dn:uid=mqttuser0005,ou=testdevice,dc=emqx,dc=io
objectClass: mqttUser
objectClass: mqttDevice
objectClass: mqttSecurity
uid: mqttuser0005
isEnabled: TRUE
mqttPublishTopic: mqttuser0005/pub/1
mqttPublishTopic: mqttuser0005/pub/+
mqttPublishTopic: mqttuser0005/pub/#
mqttSubscriptionTopic: mqttuser0005/sub/1
mqttSubscriptionTopic: mqttuser0005/sub/+
mqttSubscriptionTopic: mqttuser0005/sub/#
mqttPubSubTopic: mqttuser0005/pubsub/1
mqttPubSubTopic: mqttuser0005/pubsub/+
mqttPubSubTopic: mqttuser0005/pubsub/#
userPassword: {SHA}jKnxeEDGR14kE8AR7yuVFOelhz4=

View File

@ -0,0 +1,46 @@
#
# Preliminary Apple OS X Native LDAP Schema
# This file is subject to change.
#
attributetype ( 1.3.6.1.4.1.11.2.53.2.2.3.1.2.3.1.3 NAME 'isEnabled'
EQUALITY booleanMatch
SYNTAX 1.3.6.1.4.1.1466.115.121.1.7
SINGLE-VALUE
USAGE userApplications )
attributetype ( 1.3.6.1.4.1.11.2.53.2.2.3.1.2.3.4.1 NAME ( 'mqttPublishTopic' 'mpt' )
EQUALITY caseIgnoreMatch
SUBSTR caseIgnoreSubstringsMatch
SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
USAGE userApplications )
attributetype ( 1.3.6.1.4.1.11.2.53.2.2.3.1.2.3.4.2 NAME ( 'mqttSubscriptionTopic' 'mst' )
EQUALITY caseIgnoreMatch
SUBSTR caseIgnoreSubstringsMatch
SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
USAGE userApplications )
attributetype ( 1.3.6.1.4.1.11.2.53.2.2.3.1.2.3.4.3 NAME ( 'mqttPubSubTopic' 'mpst' )
EQUALITY caseIgnoreMatch
SUBSTR caseIgnoreSubstringsMatch
SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
USAGE userApplications )
attributetype ( 1.3.6.1.4.1.11.2.53.2.2.3.1.2.3.4.4 NAME ( 'mqttAccountName' 'man' )
EQUALITY caseIgnoreMatch
SUBSTR caseIgnoreSubstringsMatch
SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
USAGE userApplications )
objectclass ( 1.3.6.1.4.1.11.2.53.2.2.3.1.2.3.4 NAME 'mqttUser'
AUXILIARY
MAY ( mqttPublishTopic $ mqttSubscriptionTopic $ mqttPubSubTopic $ mqttAccountName) )
objectclass ( 1.3.6.1.4.1.11.2.53.2.2.3.1.2.3.2 NAME 'mqttDevice'
SUP top
STRUCTURAL
MUST ( uid )
MAY ( isEnabled ) )
objectclass ( 1.3.6.1.4.1.11.2.53.2.2.3.1.2.3.3 NAME 'mqttSecurity'
SUP top
AUXILIARY
MAY ( userPassword $ userPKCS12 $ pwdAttribute $ pwdLockout ) )

View File

@ -43,8 +43,14 @@ groups() ->
[].
init_per_testcase(_, Config) ->
delete_authenticators([authentication], ?GLOBAL),
delete_authenticators([listeners, tcp, default, authentication], ?TCP_DEFAULT),
emqx_authn_test_lib:delete_authenticators(
[authentication],
?GLOBAL),
emqx_authn_test_lib:delete_authenticators(
[listeners, tcp, default, authentication],
?TCP_DEFAULT),
{atomic, ok} = mria:clear_table(emqx_authn_mnesia),
Config.
@ -390,20 +396,6 @@ test_authenticator_import_users(PathPrefix) ->
%% Helpers
%%------------------------------------------------------------------------------
delete_authenticators(Path, Chain) ->
case emqx_authentication:list_authenticators(Chain) of
{error, _} -> ok;
{ok, Authenticators} ->
lists:foreach(
fun(#{id := ID}) ->
emqx:update_config(
Path,
{delete_authenticator, Chain, ID},
#{rawconf_with_defaults => true})
end,
Authenticators)
end.
request(Method, Url) ->
request(Method, Url, []).

View File

@ -23,6 +23,8 @@
-include("emqx_authn.hrl").
-define(AUTHN_ID, <<"mechanism:backend">>).
all() ->
emqx_common_test_helpers:all(?MODULE).
@ -75,33 +77,41 @@ t_check_schema(_Config) ->
t_create(_) ->
Config0 = config(),
{ok, _} = emqx_authn_mnesia:create(Config0),
{ok, _} = emqx_authn_mnesia:create(?AUTHN_ID, Config0),
Config1 = Config0#{password_hash_algorithm => #{name => sha256}},
{ok, _} = emqx_authn_mnesia:create(Config1).
{ok, _} = emqx_authn_mnesia:create(?AUTHN_ID, Config1).
t_update(_) ->
Config0 = config(),
{ok, State} = emqx_authn_mnesia:create(Config0),
{ok, State} = emqx_authn_mnesia:create(?AUTHN_ID, Config0),
Config1 = Config0#{password_hash_algorithm => #{name => sha256}},
{ok, _} = emqx_authn_mnesia:update(Config1, State).
t_destroy(_) ->
Config = config(),
{ok, State0} = emqx_authn_mnesia:create(Config),
OtherId = list_to_binary([?AUTHN_ID, <<"-other">>]),
{ok, State0} = emqx_authn_mnesia:create(?AUTHN_ID, Config),
{ok, StateOther} = emqx_authn_mnesia:create(OtherId, Config),
User = #{user_id => <<"u">>, password => <<"p">>},
{ok, _} = emqx_authn_mnesia:add_user(User, State0),
{ok, _} = emqx_authn_mnesia:add_user(User, StateOther),
{ok, _} = emqx_authn_mnesia:lookup_user(<<"u">>, State0),
{ok, _} = emqx_authn_mnesia:lookup_user(<<"u">>, StateOther),
ok = emqx_authn_mnesia:destroy(State0),
{ok, State1} = emqx_authn_mnesia:create(Config),
{error, not_found} = emqx_authn_mnesia:lookup_user(<<"u">>, State1).
{ok, State1} = emqx_authn_mnesia:create(?AUTHN_ID, Config),
{error,not_found} = emqx_authn_mnesia:lookup_user(<<"u">>, State1),
{ok, _} = emqx_authn_mnesia:lookup_user(<<"u">>, StateOther).
t_authenticate(_) ->
Config = config(),
{ok, State} = emqx_authn_mnesia:create(Config),
{ok, State} = emqx_authn_mnesia:create(?AUTHN_ID, Config),
User = #{user_id => <<"u">>, password => <<"p">>},
{ok, _} = emqx_authn_mnesia:add_user(User, State),
@ -118,7 +128,7 @@ t_authenticate(_) ->
t_add_user(_) ->
Config = config(),
{ok, State} = emqx_authn_mnesia:create(Config),
{ok, State} = emqx_authn_mnesia:create(?AUTHN_ID, Config),
User = #{user_id => <<"u">>, password => <<"p">>},
{ok, _} = emqx_authn_mnesia:add_user(User, State),
@ -126,7 +136,7 @@ t_add_user(_) ->
t_delete_user(_) ->
Config = config(),
{ok, State} = emqx_authn_mnesia:create(Config),
{ok, State} = emqx_authn_mnesia:create(?AUTHN_ID, Config),
{error, not_found} = emqx_authn_mnesia:delete_user(<<"u">>, State),
User = #{user_id => <<"u">>, password => <<"p">>},
@ -137,7 +147,7 @@ t_delete_user(_) ->
t_update_user(_) ->
Config = config(),
{ok, State} = emqx_authn_mnesia:create(Config),
{ok, State} = emqx_authn_mnesia:create(?AUTHN_ID, Config),
User = #{user_id => <<"u">>, password => <<"p">>},
{ok, _} = emqx_authn_mnesia:add_user(User, State),
@ -158,7 +168,7 @@ t_update_user(_) ->
t_list_users(_) ->
Config = config(),
{ok, State} = emqx_authn_mnesia:create(Config),
{ok, State} = emqx_authn_mnesia:create(?AUTHN_ID, Config),
Users = [#{user_id => <<"u1">>, password => <<"p">>},
#{user_id => <<"u2">>, password => <<"p">>},
@ -182,7 +192,7 @@ t_list_users(_) ->
t_import_users(_) ->
Config0 = config(),
Config = Config0#{password_hash_algorithm => #{name => sha256}},
{ok, State} = emqx_authn_mnesia:create(Config),
{ok, State} = emqx_authn_mnesia:create(?AUTHN_ID, Config),
ok = emqx_authn_mnesia:import_users(
data_filename(<<"user-credentials.json">>),
@ -227,6 +237,5 @@ data_filename(Name) ->
config() ->
#{user_id_type => username,
password_hash_algorithm => #{name => bcrypt,
salt_rounds => 8},
'_unique' => <<"unique">>
salt_rounds => 8}
}.

View File

@ -0,0 +1,400 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_authn_redis_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include("emqx_authn.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(REDIS_HOST, "redis").
-define(REDIS_PORT, 6379).
-define(REDIS_PROBE_TIMEOUT, 1000).
-define(REDIS_RESOURCE, <<"emqx_authn_redis_SUITE">>).
-define(PATH, [authentication]).
all() ->
[{group, require_seeds}, t_create, t_create_invalid].
groups() ->
[{require_seeds, [], [t_authenticate, t_update, t_destroy]}].
init_per_testcase(_, Config) ->
emqx_authentication:initialize_authentication(?GLOBAL, []),
emqx_authn_test_lib:delete_authenticators(
[authentication],
?GLOBAL),
Config.
init_per_group(require_seeds, Config) ->
ok = init_seeds(),
Config.
end_per_group(require_seeds, Config) ->
ok = drop_seeds(),
Config.
init_per_suite(Config) ->
case is_redis_available() of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
ok = start_apps([emqx_resource, emqx_connector]),
{ok, _} = emqx_resource:create_local(
?REDIS_RESOURCE,
emqx_connector_redis,
redis_config()),
Config;
false ->
{skip, no_redis}
end.
end_per_suite(_Config) ->
emqx_authn_test_lib:delete_authenticators(
[authentication],
?GLOBAL),
ok = emqx_resource:remove_local(?REDIS_RESOURCE),
ok = stop_apps([emqx_resource, emqx_connector]),
ok = emqx_common_test_helpers:stop_apps([emqx_authn]).
%%------------------------------------------------------------------------------
%% Tests
%%------------------------------------------------------------------------------
t_create(_Config) ->
{ok, []} = emqx_authentication:list_authenticators(?GLOBAL),
AuthConfig = raw_redis_auth_config(),
{ok, _} = emqx:update_config(
?PATH,
{create_authenticator, ?GLOBAL, AuthConfig}),
{ok, [#{provider := emqx_authn_redis}]} = emqx_authentication:list_authenticators(?GLOBAL).
t_create_invalid(_Config) ->
AuthConfig = raw_redis_auth_config(),
InvalidConfigs =
[
maps:without([server], AuthConfig),
AuthConfig#{server => <<"unknownhost:3333">>},
AuthConfig#{password => <<"wrongpass">>},
AuthConfig#{database => <<"5678">>},
AuthConfig#{
query => <<"MGET password_hash:${mqtt-username} salt:${mqtt-username}">>},
AuthConfig#{
query => <<"HMGET mqtt_user:${mqtt-username} password_hash invalid_field">>},
AuthConfig#{
query => <<"HMGET mqtt_user:${mqtt-username} salt is_superuser">>}
],
lists:foreach(
fun(Config) ->
{error, _} = emqx:update_config(
?PATH,
{create_authenticator, ?GLOBAL, Config}),
{ok, []} = emqx_authentication:list_authenticators(?GLOBAL)
end,
InvalidConfigs).
t_authenticate(_Config) ->
ok = lists:foreach(
fun(Sample) ->
ct:pal("test_user_auth sample: ~p", [Sample]),
test_user_auth(Sample)
end,
user_seeds()).
test_user_auth(#{credentials := Credentials0,
config_params := SpecificConfgParams,
result := Result}) ->
AuthConfig = maps:merge(raw_redis_auth_config(), SpecificConfgParams),
{ok, _} = emqx:update_config(
?PATH,
{create_authenticator, ?GLOBAL, AuthConfig}),
Credentials = Credentials0#{
listener => 'tcp:default',
protocol => mqtt
},
?assertEqual(Result, emqx_access_control:authenticate(Credentials)),
emqx_authn_test_lib:delete_authenticators(
[authentication],
?GLOBAL).
t_destroy(_Config) ->
AuthConfig = raw_redis_auth_config(),
{ok, _} = emqx:update_config(
?PATH,
{create_authenticator, ?GLOBAL, AuthConfig}),
{ok, [#{provider := emqx_authn_redis, state := State}]}
= emqx_authentication:list_authenticators(?GLOBAL),
{ok, _} = emqx_authn_redis:authenticate(
#{username => <<"plain">>,
password => <<"plain">>
},
State),
emqx_authn_test_lib:delete_authenticators(
[authentication],
?GLOBAL),
% Authenticator should not be usable anymore
?assertException(
error,
_,
emqx_authn_redis:authenticate(
#{username => <<"plain">>,
password => <<"plain">>
},
State)).
t_update(_Config) ->
CorrectConfig = raw_redis_auth_config(),
IncorrectConfig =
CorrectConfig#{
query => <<"HMGET invalid_key:${mqtt-username} password_hash salt is_superuser">>},
{ok, _} = emqx:update_config(
?PATH,
{create_authenticator, ?GLOBAL, IncorrectConfig}),
{error, not_authorized} = emqx_access_control:authenticate(
#{username => <<"plain">>,
password => <<"plain">>,
listener => 'tcp:default',
protocol => mqtt
}),
% We update with config with correct query, provider should update and work properly
{ok, _} = emqx:update_config(
?PATH,
{update_authenticator, ?GLOBAL, <<"password-based:redis">>, CorrectConfig}),
{ok,_} = emqx_access_control:authenticate(
#{username => <<"plain">>,
password => <<"plain">>,
listener => 'tcp:default',
protocol => mqtt
}).
%%------------------------------------------------------------------------------
%% Helpers
%%------------------------------------------------------------------------------
raw_redis_auth_config() ->
#{
mechanism => <<"password-based">>,
password_hash_algorithm => <<"plain">>,
salt_position => <<"suffix">>,
enable => <<"true">>,
backend => <<"redis">>,
query => <<"HMGET mqtt_user:${mqtt-username} password_hash salt is_superuser">>,
database => <<"1">>,
password => <<"public">>,
server => redis_server()
}.
user_seeds() ->
[#{data => #{
password_hash => "plainsalt",
salt => "salt",
is_superuser => "1"
},
credentials => #{
username => <<"plain">>,
password => <<"plain">>},
key => "mqtt_user:plain",
config_params => #{},
result => {ok,#{is_superuser => true}}
},
#{data => #{
password_hash => "9b4d0c43d206d48279e69b9ad7132e22",
salt => "salt",
is_superuser => "0"
},
credentials => #{
username => <<"md5">>,
password => <<"md5">>
},
key => "mqtt_user:md5",
config_params => #{
password_hash_algorithm => <<"md5">>,
salt_position => <<"suffix">>
},
result => {ok,#{is_superuser => false}}
},
#{data => #{
password_hash => "ac63a624e7074776d677dd61a003b8c803eb11db004d0ec6ae032a5d7c9c5caf",
salt => "salt",
is_superuser => "1"
},
credentials => #{
clientid => <<"sha256">>,
password => <<"sha256">>
},
key => "mqtt_user:sha256",
config_params => #{
query => <<"HMGET mqtt_user:${mqtt-clientid} password_hash salt is_superuser">>,
password_hash_algorithm => <<"sha256">>,
salt_position => <<"prefix">>
},
result => {ok,#{is_superuser => true}}
},
#{data => #{
password_hash => "$2b$12$wtY3h20mUjjmeaClpqZVveDWGlHzCGsvuThMlneGHA7wVeFYyns2u",
salt => "$2b$12$wtY3h20mUjjmeaClpqZVve",
is_superuser => "0"
},
credentials => #{
username => <<"bcrypt">>,
password => <<"bcrypt">>
},
key => "mqtt_user:bcrypt",
config_params => #{
password_hash_algorithm => <<"bcrypt">>,
salt_position => <<"suffix">> % should be ignored
},
result => {ok,#{is_superuser => false}}
},
#{data => #{
password_hash => "$2b$12$wtY3h20mUjjmeaClpqZVveDWGlHzCGsvuThMlneGHA7wVeFYyns2u",
salt => "$2b$12$wtY3h20mUjjmeaClpqZVve",
is_superuser => "0"
},
credentials => #{
username => <<"bcrypt0">>,
password => <<"bcrypt">>
},
key => "mqtt_user:bcrypt0",
config_params => #{
% clientid variable & username credentials
query => <<"HMGET mqtt_client:${mqtt-clientid} password_hash salt is_superuser">>,
password_hash_algorithm => <<"bcrypt">>,
salt_position => <<"suffix">>
},
result => {error,not_authorized}
},
#{data => #{
password_hash => "$2b$12$wtY3h20mUjjmeaClpqZVveDWGlHzCGsvuThMlneGHA7wVeFYyns2u",
salt => "$2b$12$wtY3h20mUjjmeaClpqZVve",
is_superuser => "0"
},
credentials => #{
username => <<"bcrypt1">>,
password => <<"bcrypt">>
},
key => "mqtt_user:bcrypt1",
config_params => #{
% Bad key in query
query => <<"HMGET badkey:${mqtt-username} password_hash salt is_superuser">>,
password_hash_algorithm => <<"bcrypt">>,
salt_position => <<"suffix">>
},
result => {error,not_authorized}
},
#{data => #{
password_hash => "$2b$12$wtY3h20mUjjmeaClpqZVveDWGlHzCGsvuThMlneGHA7wVeFYyns2u",
salt => "$2b$12$wtY3h20mUjjmeaClpqZVve",
is_superuser => "0"
},
credentials => #{
username => <<"bcrypt2">>,
% Wrong password
password => <<"wrongpass">>
},
key => "mqtt_user:bcrypt2",
config_params => #{
query => <<"HMGET mqtt_user:${mqtt-username} password_hash salt is_superuser">>,
password_hash_algorithm => <<"bcrypt">>,
salt_position => <<"suffix">>
},
result => {error,bad_username_or_password}
}
].
init_seeds() ->
ok = drop_seeds(),
lists:foreach(
fun(#{key := UserKey, data := Values}) ->
lists:foreach(fun({Key, Value}) ->
q(["HSET", UserKey, atom_to_list(Key), Value])
end,
maps:to_list(Values))
end,
user_seeds()).
q(Command) ->
emqx_resource:query(
?REDIS_RESOURCE,
{cmd, Command}).
drop_seeds() ->
lists:foreach(
fun(#{key := UserKey}) ->
q(["DEL", UserKey])
end,
user_seeds()).
redis_server() ->
iolist_to_binary(
io_lib:format(
"~s:~b",
[?REDIS_HOST, ?REDIS_PORT])).
is_redis_available() ->
case gen_tcp:connect(?REDIS_HOST, ?REDIS_PORT, [], ?REDIS_PROBE_TIMEOUT) of
{ok, Socket} ->
gen_tcp:close(Socket),
true;
{error, _} ->
false
end.
redis_config() ->
#{auto_reconnect => true,
database => 1,
pool_size => 8,
redis_type => single,
password => "public",
server => {?REDIS_HOST, ?REDIS_PORT},
ssl => #{enable => false}
}.
start_apps(Apps) ->
lists:foreach(fun application:ensure_all_started/1, Apps).
stop_apps(Apps) ->
lists:foreach(fun application:stop/1, Apps).

View File

@ -31,3 +31,17 @@ built_in_database_example() ->
jwt_example() ->
authenticator_example(jwt).
delete_authenticators(Path, Chain) ->
case emqx_authentication:list_authenticators(Chain) of
{error, _} -> ok;
{ok, Authenticators} ->
lists:foreach(
fun(#{id := ID}) ->
emqx:update_config(
Path,
{delete_authenticator, Chain, ID},
#{rawconf_with_defaults => true})
end,
Authenticators)
end.

View File

@ -44,7 +44,7 @@
rate_last5m => RATE_5,
rate_max => RATE_MAX
}).
-define(metrics(SUCC, FAILED, RATE, RATE_5, RATE_MAX),
-define(MATCH_METRICS(SUCC, FAILED, RATE, RATE_5, RATE_MAX),
#{
success := SUCC,
failed := FAILED,
@ -107,7 +107,7 @@ resp_schema() ->
more_props_resp_schema(AddMetadata).
more_props_resp_schema(AddMetadata) ->
#{oneOf := Schema} = req_schema(),
#{'oneOf' := Schema} = req_schema(),
Schema1 = [S#{properties => AddMetadata(Prop)}
|| S = #{properties := Prop} <- Schema],
#{'oneOf' => Schema1}.
@ -258,7 +258,7 @@ crud_bridges_in_cluster(delete, #{bindings := #{id := Id}}) ->
#{override_to => cluster}) of
{ok, _} -> {204};
{error, Reason} ->
{500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}}
{500, #{code => 102, message => emqx_resource_api:stringify(Reason)}}
end).
lookup_from_all_nodes(Id, BridgeType, BridgeName, SuccCode) ->
@ -288,7 +288,7 @@ manage_bridges(post, #{bindings := #{node := Node, id := Id, operation := Op}})
[BridgeType, BridgeName]) of
ok -> {200};
{error, Reason} ->
{500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}}
{500, #{code => 102, message => emqx_resource_api:stringify(Reason)}}
end).
ensure_bridge(BridgeType, BridgeName, Conf) ->
@ -338,8 +338,8 @@ collect_metrics(Bridges) ->
aggregate_metrics(AllMetrics) ->
InitMetrics = ?METRICS(0,0,0,0,0),
lists:foldl(fun(#{metrics := ?metrics(Succ1, Failed1, Rate1, Rate5m1, RateMax1)},
?metrics(Succ0, Failed0, Rate0, Rate5m0, RateMax0)) ->
lists:foldl(fun(#{metrics := ?MATCH_METRICS(Succ1, Failed1, Rate1, Rate5m1, RateMax1)},
?MATCH_METRICS(Succ0, Failed0, Rate0, Rate5m0, RateMax0)) ->
?METRICS(Succ1 + Succ0, Failed1 + Failed0,
Rate1 + Rate0, Rate5m1 + Rate5m0, RateMax1 + RateMax0)
end, InitMetrics, AllMetrics).

View File

@ -20,4 +20,4 @@
# certfile = "{{ platform_etc_dir }}/certs/client-cert.pem"
# cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
# }
#}
#}

View File

@ -19,10 +19,10 @@
-include("emqx_connector.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx_resource/include/emqx_resource_behaviour.hrl").
-include_lib("emqx/include/logger.hrl").
-behaviour(emqx_resource).
%% callbacks of behaviour emqx_resource
-export([ on_start/2
, on_stop/2

View File

@ -17,11 +17,12 @@
-include("emqx_connector.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx_resource/include/emqx_resource_behaviour.hrl").
-include_lib("emqx/include/logger.hrl").
-export([roots/0, fields/1]).
-behaviour(emqx_resource).
%% callbacks of behaviour emqx_resource
-export([ on_start/2
, on_stop/2
@ -60,7 +61,10 @@ on_start(InstId, #{servers := Servers0,
SslOpts = case maps:get(enable, SSL) of
true ->
[{ssl, true},
{sslopts, emqx_plugin_libs_ssl:save_files_return_opts(SSL, "connectors", InstId)}
{sslopts, emqx_plugin_libs_ssl:save_files_return_opts(
SSL,
"connectors",
InstId)}
];
false -> [{ssl, false}]
end,
@ -86,7 +90,10 @@ on_query(InstId, {search, Base, Filter, Attributes}, AfterQuery, #{poolname := P
?SLOG(debug, #{msg => "ldap connector received request",
request => Request, connector => InstId,
state => State}),
case Result = ecpool:pick_and_do(PoolName, {?MODULE, search, [Base, Filter, Attributes]}, no_handover) of
case Result = ecpool:pick_and_do(
PoolName,
{?MODULE, search, [Base, Filter, Attributes]},
no_handover) of
{error, Reason} ->
?SLOG(error, #{msg => "ldap connector do request failed",
request => Request, connector => InstId,
@ -110,7 +117,7 @@ search(Conn, Base, Filter, Attributes) ->
eldap2:search(Conn, [{base, Base},
{filter, Filter},
{attributes, Attributes},
{deref, eldap2:derefFindingBaseObj()}]).
{deref, eldap2:'derefFindingBaseObj'()}]).
%% ===================================================================
connect(Opts) ->

View File

@ -17,9 +17,14 @@
-include("emqx_connector.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx_resource/include/emqx_resource_behaviour.hrl").
-include_lib("emqx/include/logger.hrl").
-type server() :: emqx_schema:ip_port().
-reflect_type([server/0]).
-typerefl_from_string({server/0, emqx_connector_schema_lib, to_ip_port}).
-behaviour(emqx_resource).
%% callbacks of behaviour emqx_resource
-export([ on_start/2
, on_stop/2
@ -104,7 +109,11 @@ on_start(InstId, Config = #{mongo_type := Type,
SslOpts = case maps:get(enable, SSL) of
true ->
[{ssl, true},
{ssl_opts, emqx_plugin_libs_ssl:save_files_return_opts(SSL, "connectors", InstId)}
{ssl_opts,
emqx_plugin_libs_ssl:save_files_return_opts(
SSL,
"connectors",
InstId)}
];
false -> [{ssl, false}]
end,
@ -122,12 +131,17 @@ on_stop(InstId, #{poolname := PoolName}) ->
connector => InstId}),
emqx_plugin_libs_pool:stop_pool(PoolName).
on_query(InstId, {Action, Collection, Selector, Docs}, AfterQuery, #{poolname := PoolName} = State) ->
on_query(InstId,
{Action, Collection, Selector, Docs},
AfterQuery,
#{poolname := PoolName} = State) ->
Request = {Action, Collection, Selector, Docs},
?SLOG(debug, #{msg => "mongodb connector received request",
request => Request, connector => InstId,
state => State}),
case ecpool:pick_and_do(PoolName, {?MODULE, mongo_query, [Action, Collection, Selector, Docs]}, no_handover) of
case ecpool:pick_and_do(PoolName,
{?MODULE, mongo_query, [Action, Collection, Selector, Docs]},
no_handover) of
{error, Reason} ->
?SLOG(error, #{msg => "mongodb connector do query failed",
request => Request, reason => Reason,
@ -136,7 +150,7 @@ on_query(InstId, {Action, Collection, Selector, Docs}, AfterQuery, #{poolname :=
{error, Reason};
{ok, Cursor} when is_pid(Cursor) ->
emqx_resource:query_success(AfterQuery),
mc_cursor:foldl(fun(O, Acc2) -> [O|Acc2] end, [], Cursor, 1000);
mc_cursor:foldl(fun(O, Acc2) -> [O | Acc2] end, [], Cursor, 1000);
Result ->
emqx_resource:query_success(AfterQuery),
Result
@ -184,29 +198,29 @@ init_type(#{type := rs, replica_set_name := ReplicaSetName}) ->
init_type(#{type := Type}) ->
Type.
init_topology_options([{pool_size, Val}| R], Acc) ->
init_topology_options(R, [{pool_size, Val}| Acc]);
init_topology_options([{max_overflow, Val}| R], Acc) ->
init_topology_options(R, [{max_overflow, Val}| Acc]);
init_topology_options([{overflow_ttl, Val}| R], Acc) ->
init_topology_options(R, [{overflow_ttl, Val}| Acc]);
init_topology_options([{overflow_check_period, Val}| R], Acc) ->
init_topology_options(R, [{overflow_check_period, Val}| Acc]);
init_topology_options([{local_threshold_ms, Val}| R], Acc) ->
init_topology_options(R, [{'localThresholdMS', Val}| Acc]);
init_topology_options([{connect_timeout_ms, Val}| R], Acc) ->
init_topology_options(R, [{'connectTimeoutMS', Val}| Acc]);
init_topology_options([{socket_timeout_ms, Val}| R], Acc) ->
init_topology_options(R, [{'socketTimeoutMS', Val}| Acc]);
init_topology_options([{server_selection_timeout_ms, Val}| R], Acc) ->
init_topology_options(R, [{'serverSelectionTimeoutMS', Val}| Acc]);
init_topology_options([{wait_queue_timeout_ms, Val}| R], Acc) ->
init_topology_options(R, [{'waitQueueTimeoutMS', Val}| Acc]);
init_topology_options([{heartbeat_frequency_ms, Val}| R], Acc) ->
init_topology_options(R, [{'heartbeatFrequencyMS', Val}| Acc]);
init_topology_options([{min_heartbeat_frequency_ms, Val}| R], Acc) ->
init_topology_options(R, [{'minHeartbeatFrequencyMS', Val}| Acc]);
init_topology_options([_| R], Acc) ->
init_topology_options([{pool_size, Val} | R], Acc) ->
init_topology_options(R, [{pool_size, Val} | Acc]);
init_topology_options([{max_overflow, Val} | R], Acc) ->
init_topology_options(R, [{max_overflow, Val} | Acc]);
init_topology_options([{overflow_ttl, Val} | R], Acc) ->
init_topology_options(R, [{overflow_ttl, Val} | Acc]);
init_topology_options([{overflow_check_period, Val} | R], Acc) ->
init_topology_options(R, [{overflow_check_period, Val} | Acc]);
init_topology_options([{local_threshold_ms, Val} | R], Acc) ->
init_topology_options(R, [{'localThresholdMS', Val} | Acc]);
init_topology_options([{connect_timeout_ms, Val} | R], Acc) ->
init_topology_options(R, [{'connectTimeoutMS', Val} | Acc]);
init_topology_options([{socket_timeout_ms, Val} | R], Acc) ->
init_topology_options(R, [{'socketTimeoutMS', Val} | Acc]);
init_topology_options([{server_selection_timeout_ms, Val} | R], Acc) ->
init_topology_options(R, [{'serverSelectionTimeoutMS', Val} | Acc]);
init_topology_options([{wait_queue_timeout_ms, Val} | R], Acc) ->
init_topology_options(R, [{'waitQueueTimeoutMS', Val} | Acc]);
init_topology_options([{heartbeat_frequency_ms, Val} | R], Acc) ->
init_topology_options(R, [{'heartbeatFrequencyMS', Val} | Acc]);
init_topology_options([{min_heartbeat_frequency_ms, Val} | R], Acc) ->
init_topology_options(R, [{'minHeartbeatFrequencyMS', Val} | Acc]);
init_topology_options([_ | R], Acc) ->
init_topology_options(R, Acc);
init_topology_options([], Acc) ->
Acc.
@ -251,7 +265,7 @@ parse_servers(Type, Servers) when is_binary(Servers) ->
parse_servers(Type, binary_to_list(Servers));
parse_servers(Type, Servers) when is_list(Servers) ->
case string:split(Servers, ",", trailing) of
[Host | _] when Type =:= single ->
[Host | _] when Type =:= single ->
[Host];
Hosts ->
Hosts
@ -286,7 +300,7 @@ parse_srv_records(Type, Server) ->
error(service_not_found);
Services ->
case [Host ++ ":" ++ integer_to_list(Port) || {_, _, Port, Host} <- Services] of
[H | _] when Type =:= single ->
[H | _] when Type =:= single ->
[H];
Hosts ->
Hosts

View File

@ -16,10 +16,10 @@
-module(emqx_connector_mqtt).
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx_resource/include/emqx_resource_behaviour.hrl").
-include_lib("emqx/include/logger.hrl").
-behaviour(supervisor).
-behaviour(emqx_resource).
%% API and callbacks for supervisor
-export([ start_link/0

View File

@ -16,9 +16,10 @@
-module(emqx_connector_mysql).
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx_resource/include/emqx_resource_behaviour.hrl").
-include_lib("emqx/include/logger.hrl").
-behaviour(emqx_resource).
%% callbacks of behaviour emqx_resource
-export([ on_start/2
, on_stop/2
@ -86,7 +87,10 @@ on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := _PoolName} = Stat
on_query(InstId, {sql, SQL, Params, Timeout}, AfterQuery, #{poolname := PoolName} = State) ->
?SLOG(debug, #{msg => "mysql connector received sql query",
connector => InstId, sql => SQL, state => State}),
case Result = ecpool:pick_and_do(PoolName, {mysql, query, [SQL, Params, Timeout]}, no_handover) of
case Result = ecpool:pick_and_do(
PoolName,
{mysql, query, [SQL, Params, Timeout]},
no_handover) of
{error, Reason} ->
?SLOG(error, #{msg => "mysql connector do sql query failed",
connector => InstId, sql => SQL, reason => Reason}),

View File

@ -16,11 +16,12 @@
-module(emqx_connector_pgsql).
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx_resource/include/emqx_resource_behaviour.hrl").
-include_lib("emqx/include/logger.hrl").
-export([roots/0, fields/1]).
-behaviour(emqx_resource).
%% callbacks of behaviour emqx_resource
-export([ on_start/2
, on_stop/2
@ -118,15 +119,15 @@ conn_opts(Opts) ->
conn_opts(Opts, []).
conn_opts([], Acc) ->
Acc;
conn_opts([Opt = {database, _}|Opts], Acc) ->
conn_opts(Opts, [Opt|Acc]);
conn_opts([Opt = {ssl, _}|Opts], Acc) ->
conn_opts(Opts, [Opt|Acc]);
conn_opts([Opt = {port, _}|Opts], Acc) ->
conn_opts(Opts, [Opt|Acc]);
conn_opts([Opt = {timeout, _}|Opts], Acc) ->
conn_opts(Opts, [Opt|Acc]);
conn_opts([Opt = {ssl_opts, _}|Opts], Acc) ->
conn_opts(Opts, [Opt|Acc]);
conn_opts([_Opt|Opts], Acc) ->
conn_opts([Opt = {database, _} | Opts], Acc) ->
conn_opts(Opts, [Opt | Acc]);
conn_opts([Opt = {ssl, _} | Opts], Acc) ->
conn_opts(Opts, [Opt | Acc]);
conn_opts([Opt = {port, _} | Opts], Acc) ->
conn_opts(Opts, [Opt | Acc]);
conn_opts([Opt = {timeout, _} | Opts], Acc) ->
conn_opts(Opts, [Opt | Acc]);
conn_opts([Opt = {ssl_opts, _} | Opts], Acc) ->
conn_opts(Opts, [Opt | Acc]);
conn_opts([_Opt | Opts], Acc) ->
conn_opts(Opts, Acc).

View File

@ -17,7 +17,6 @@
-include("emqx_connector.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx_resource/include/emqx_resource_behaviour.hrl").
-include_lib("emqx/include/logger.hrl").
-type server() :: tuple().
@ -30,6 +29,8 @@
-export([roots/0, fields/1]).
-behaviour(emqx_resource).
%% callbacks of behaviour emqx_resource
-export([ on_start/2
, on_stop/2

View File

@ -1,43 +0,0 @@
REBAR := rebar3
.PHONY: all
all: es
.PHONY: compile
compile:
$(REBAR) compile
.PHONY: clean
clean: distclean
.PHONY: distclean
distclean:
@rm -rf _build erl_crash.dump rebar3.crashdump
.PHONY: xref
xref:
$(REBAR) xref
.PHONY: eunit
eunit: compile
$(REBAR) eunit -v -c
$(REBAR) cover
.PHONY: ct
ct: compile
$(REBAR) as test ct -v
cover:
$(REBAR) cover
.PHONY: dialyzer
dialyzer:
$(REBAR) dialyzer
.PHONY: es
es: compile
$(REBAR) escriptize
.PHONY: elvis
elvis:
./scripts/elvis-check.sh

View File

@ -13,36 +13,6 @@ The main idea of the emqx resource is to put all the `general` code in a common
the config operations (like config validation, config dump back to files), and the state management.
And we put all the `specific` codes to the callback modules.
## Try it out
$ ./demo.sh
Eshell V11.1.8 (abort with ^G)
1> == the demo log tracer <<"log_tracer_clientid_shawn">> started.
config: #{<<"config">> =>
#{<<"bulk">> => <<"10KB">>,<<"cache_log_dir">> => <<"/tmp">>,
<<"condition">> => #{<<"clientid">> => <<"abc">>},
<<"level">> => <<"debug">>},
<<"id">> => <<"log_tracer_clientid_shawn">>,
<<"resource_type">> => <<"log_tracer">>}
1> emqx_resource_instance:health_check(<<"log_tracer_clientid_shawn">>).
== the demo log tracer <<"log_tracer_clientid_shawn">> is working well
state: #{health_checked => 1,logger_handler_id => abc}
ok
2> emqx_resource_instance:health_check(<<"log_tracer_clientid_shawn">>).
== the demo log tracer <<"log_tracer_clientid_shawn">> is working well
state: #{health_checked => 2,logger_handler_id => abc}
ok
3> emqx_resource_instance:query(<<"log_tracer_clientid_shawn">>, get_log).
== the demo log tracer <<"log_tracer_clientid_shawn">> received request: get_log
state: #{health_checked => 2,logger_handler_id => abc}
"this is a demo log messages..."
4> emqx_resource_instance:remove(<<"log_tracer_clientid_shawn">>).
== the demo log tracer <<"log_tracer_clientid_shawn">> stopped.
state: #{health_checked => 0,logger_handler_id => abc}
ok
5> emqx_resource_instance:query(<<"log_tracer_clientid_shawn">>, get_log).
** exception error: {get_instance,{<<"log_tracer_clientid_shawn">>,not_found}}
See
* `test/emqx_test_resource.erl` for a minimal `emqx_resource` implementation;
* `test/emqx_resource_SUITE.erl` for examples of `emqx_resource` usage.

View File

@ -1,6 +0,0 @@
#!/bin/sh
set -e
rebar3 compile
erl -sname abc -pa _build/default/lib/*/ebin _build/default/lib/emqx_resource/examples -s demo

View File

@ -1,15 +0,0 @@
%% -*- mode: erlang -*-
[{elvis, [{config, [
#{dirs => ["src"],
filter => "*.erl",
%ignore => [],
ruleset => erl_files,
rules => [{elvis_style, operator_spaces, #{
rules => [{right, ","},
{right, "|"},
{left, "|"},
{right, "||"},
{left, "||"}]}},
{elvis_style, god_modules, #{limit => 100}}]}
]}]}].

View File

@ -1,13 +0,0 @@
-module(demo).
-export([start/0]).
start() ->
code:load_file(log_tracer),
code:load_file(log_tracer_schema),
{ok, _} = application:ensure_all_started(minirest),
{ok, _} = application:ensure_all_started(emqx_resource),
emqx_resource:load_instances("./_build/default/lib/emqx_resource/examples"),
Handlers = [{"/", minirest:handler(#{modules => [log_tracer]})}],
Dispatch = [{"/[...]", minirest, Handlers}],
minirest:start_http(?MODULE, #{socket_opts => [inet, {port, 9900}]}, Dispatch).

View File

@ -1,152 +0,0 @@
---
theme: gaia
color: #000
colorSecondary: #333
backgroundColor: #fff
backgroundImage: url('https://marp.app/assets/hero-background.jpg')
paginate: true
marp: true
---
<!-- _class: lead -->
# EMQ X Resource
---
## What is it for
The [emqx_resource](https://github.com/emqx/emqx/tree/master/apps/emqx_resource) is a behavior that manages configuration specs and runtime states for resources like mysql or redis backends.
It is intended to be used by the emqx_bridges and all other resources that need CRUD operations to their configs, and need to initialize the states when creating.
---
<!-- _class: lead -->
# The Demo
The bridge for mysql
---
## The callback module 'emqx_mysql_connector'
1. include the emqx_resource_behaviour.hrl:
```
-include_lib("emqx_resource/include/emqx_resource_behaviour.hrl").
```
---
2. provide the hocon schema for validating the configs:
```
schema() ->
emqx_connector_schema_lib:relational_db_fields() ++
emqx_connector_schema_lib:ssl_fields().
...
```
---
3. write the callback functions for starting or stopping the resource instance:
```
on_start/2,
on_stop/2,
on_query/4,
on_health_check/2
```
---
## Start the emqx_bridge
```
application:ensure_all_started(emqx_bridge).
```
---
## To use the mysql resource from code:
```
emqx_resource:query(ResourceID, {sql, SQL}).
```
```
(emqx@127.0.0.1)2> emqx_resource:list_instances_verbose().
[#{config =>
#{<<"auto_reconnect">> => true,<<"cacertfile">> => [],
<<"certfile">> => [],<<"database">> => "mqtt",
<<"keyfile">> => [],<<"password">> => "public",
<<"pool_size">> => 1,
<<"server">> => {{127,0,0,1},3306},
<<"ssl">> => false,<<"username">> => "root",
<<"verify">> => false},
id => <<"bridge:mysql-def">>,mod => emqx_connector_mysql,
state => #{poolname => 'bridge:mysql-def'},
status => started}]
(emqx@127.0.0.1)3> emqx_resource:query(<<"bridge:mysql-def">>, {sql, <<"SELECT count(1)">>}).
{ok,[<<"count(1)">>],[[1]]}
```
---
## To get all available data bridges:
```
curl -q --basic -u admin:public -X GET "http://localhost:8081/api/v4/data_bridges/" | jq .
```
---
## Create
To create a mysql data bridge:
```
BridgeMySQL='{
"type": "mysql",
"status": "started",
"config": {
"verify": false,
"username": "root",
"ssl": false,
"server": "127.0.0.1:3306",
"pool_size": 1,
"password": "public",
"keyfile": "",
"database": "mqtt",
"certfile": "",
"cacertfile": "",
"auto_reconnect": true
}
}'
curl -q --basic -u admin:public -X POST "http://localhost:8081/api/v4/data_bridges/mysql-aaaa" -d $BridgeMySQL | jq .
```
---
## Update
To update an existing data bridge:
```
BridgeMySQL='{
"type": "mysql",
"status": "started",
"config": {
"verify": false,
"username": "root",
"ssl": false,
"server": "127.0.0.1:3306",
"pool_size": 2,
"password": "public",
"keyfile": "",
"database": "mqtt",
"certfile": "",
"cacertfile": "",
"auto_reconnect": true
}
}'
curl -q --basic -u admin:public -X PUT "http://localhost:8081/api/v4/data_bridges/mysql-aaaa" -d $BridgeMySQL | jq .
```

View File

@ -1,11 +0,0 @@
{
"id": "log_tracer_clientid_shawn"
"resource_type": "log_tracer"
"config": {
"condition": {"app": "emqx"}
"level": "debug"
"cache_log_dir": "/tmp"
"bulk": "10KB"
"chars_limit": 1024
}
}

View File

@ -1,43 +0,0 @@
-module(log_tracer).
-include_lib("emqx_resource/include/emqx_resource_behaviour.hrl").
%% callbacks of behaviour emqx_resource
-export([ on_start/2
, on_stop/2
, on_query/4
, on_health_check/2
, on_api_reply_format/1
, on_config_merge/3
]).
%% callbacks for emqx_resource config schema
-export([schema/0]).
schema() ->
log_tracer_schema:schema().
on_start(InstId, Config) ->
io:format("== the demo log tracer ~p started.~nconfig: ~p~n", [InstId, Config]),
{ok, #{logger_handler_id => abc, health_checked => 0}}.
on_stop(InstId, State) ->
io:format("== the demo log tracer ~p stopped.~nstate: ~p~n", [InstId, State]),
ok.
on_query(InstId, Request, AfterQuery, State) ->
io:format("== the demo log tracer ~p received request: ~p~nstate: ~p~n",
[InstId, Request, State]),
emqx_resource:query_success(AfterQuery),
"this is a demo log messages...".
on_health_check(InstId, State = #{health_checked := Checked}) ->
NState = State#{health_checked => Checked + 1},
io:format("== the demo log tracer ~p is working well~nstate: ~p~n", [InstId, NState]),
{ok, NState}.
on_api_reply_format(#{id := Id, status := Status, state := #{health_checked := NChecked}}) ->
#{id => Id, status => Status, checked_count => NChecked}.
on_config_merge(OldConfig, NewConfig, _Params) ->
maps:merge(OldConfig, NewConfig).

View File

@ -1,44 +0,0 @@
-module(log_tracer_schema).
-include_lib("typerefl/include/types.hrl").
-export([schema/0]).
-reflect_type([t_level/0, t_cache_logs_in/0]).
-type t_level() :: debug | info | notice | warning | error | critical | alert | emergency.
-type t_cache_logs_in() :: memory | file.
schema() ->
[ {condition, fun condition/1}
, {level, fun level/1}
, {enable_cache, fun enable_cache/1}
, {cache_logs_in, fun cache_logs_in/1}
, {cache_log_dir, fun cache_log_dir/1}
, {bulk, fun bulk/1}
].
condition(mapping) -> "config.condition";
condition(type) -> map();
condition(_) -> undefined.
level(mapping) -> "config.level";
level(type) -> t_level();
level(_) -> undefined.
enable_cache(mapping) -> "config.enable_cache";
enable_cache(type) -> boolean();
enable_cache(_) -> undefined.
cache_logs_in(mapping) -> "config.cache_logs_in";
cache_logs_in(type) -> t_cache_logs_in();
cache_logs_in(_) -> undefined.
cache_log_dir(mapping) -> "config.cache_log_dir";
cache_log_dir(type) -> typerefl:regexp_string("^(.*)$");
cache_log_dir(_) -> undefined.
bulk(mapping) -> "config.bulk";
bulk(type) -> typerefl:regexp_string("^[. 0-9]+(B|KB|MB|GB)$");
bulk(_) -> undefined.

View File

@ -27,7 +27,7 @@
state => resource_state(),
status => started | stopped
}.
-type resource_group() :: binary().
-type after_query() :: {OnSuccess :: after_query_fun(), OnFailed :: after_query_fun()} |
undefined.

View File

@ -1,18 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-include_lib("emqx_resource/include/emqx_resource.hrl").
-behaviour(emqx_resource).
-compile({parse_transform, emqx_resource_transform}).

View File

@ -1,17 +0,0 @@
#!/bin/bash
set -euo pipefail
ELVIS_VERSION='1.0.0-emqx-2'
elvis_version="${2:-$ELVIS_VERSION}"
echo "elvis -v: $elvis_version"
if [ ! -f ./elvis ] || [ "$(./elvis -v | grep -oE '[1-9]+\.[0-9]+\.[0-9]+\-emqx-[0-9]+')" != "$elvis_version" ]; then
curl -fLO "https://github.com/emqx/elvis/releases/download/$elvis_version/elvis"
chmod +x ./elvis
fi
./elvis rock --config elvis.config

View File

@ -21,15 +21,9 @@
%% APIs for resource types
-export([ get_type/1
, list_types/0
, list_types_verbose/0
]).
-export([list_types/0]).
-export([ discover_resource_mods/0
, is_resource_mod/1
, call_instance/2
]).
%% APIs for behaviour implementations
-export([ query_success/1
, query_failed/1
@ -42,7 +36,6 @@
, check_and_create_local/3
, check_and_recreate/4
, check_and_recreate_local/4
, resource_type_from_str/1
]).
%% Sync resource instances and files
@ -79,22 +72,21 @@
, list_instances_verbose/0 %% list all the instances
, get_instance/1 %% return the data of the instance
, list_instances_by_type/1 %% return all the instances of the same resource type
% , dependents/1
% , inc_counter/2 %% increment the counter of the instance
% , inc_counter/3 %% increment the counter by a given integer
, generate_id/1
, generate_id/2
, list_group_instances/1
]).
-define(HOCON_CHECK_OPTS, #{atom_key => true, nullable => true}).
-define(DEFAULT_RESOURCE_GROUP, <<"default">>).
-optional_callbacks([ on_query/4
, on_health_check/2
, on_config_merge/3
, on_jsonify/1
, on_api_reply_format/1
]).
-callback on_api_reply_format(resource_data()) -> jsx:json_term().
-callback on_config_merge(resource_config(), resource_config(), term()) -> resource_config().
-callback on_jsonify(resource_config()) -> jsx:json_term().
@ -113,33 +105,20 @@
-callback on_health_check(instance_id(), resource_state()) ->
{ok, resource_state()} | {error, Reason:: term(), resource_state()}.
%% load specs and return the loaded resources this time.
-spec list_types_verbose() -> [resource_spec()].
list_types_verbose() ->
[get_spec(Mod) || Mod <- list_types()].
-spec list_types() -> [module()].
list_types() ->
discover_resource_mods().
-spec get_type(module()) -> {ok, resource_spec()} | {error, not_found}.
get_type(Mod) ->
case is_resource_mod(Mod) of
true -> {ok, get_spec(Mod)};
false -> {error, not_found}
end.
-spec get_spec(module()) -> resource_spec().
get_spec(Mod) ->
maps:put(<<"resource_type">>, Mod, Mod:emqx_resource_schema()).
-spec discover_resource_mods() -> [module()].
discover_resource_mods() ->
[Mod || {Mod, _} <- code:all_loaded(), is_resource_mod(Mod)].
-spec is_resource_mod(module()) -> boolean().
is_resource_mod(Mod) ->
erlang:function_exported(Mod, emqx_resource_schema, 0).
is_resource_mod(Module) ->
Info = Module:module_info(attributes),
Behaviour = proplists:get_value(behavior, Info, []) ++
proplists:get_value(behaviour, Info, []),
lists:member(?MODULE, Behaviour).
-spec query_success(after_query()) -> ok.
query_success(undefined) -> ok;
@ -155,7 +134,7 @@ query_failed({_, {OnFailed, Args}}) ->
%% APIs for resource instances
%% =================================================================================
-spec create(instance_id(), resource_type(), resource_config()) ->
{ok, resource_data() |'already_created'} | {error, Reason :: term()}.
{ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
create(InstId, ResourceType, Config) ->
cluster_call(create_local, [InstId, ResourceType, Config]).
@ -199,12 +178,14 @@ query(InstId, Request) ->
query(InstId, Request, undefined).
%% same to above, also defines what to do when the Module:on_query success or failed
%% it is the duty of the Moudle to apply the `after_query()` functions.
%% it is the duty of the Module to apply the `after_query()` functions.
-spec query(instance_id(), Request :: term(), after_query()) -> Result :: term().
query(InstId, Request, AfterQuery) ->
case get_instance(InstId) of
{ok, #{mod := Mod, state := ResourceState}} ->
%% the resource state is readonly to Moudle:on_query/4
{ok, #{status := stopped}} ->
error({InstId, stopped});
{ok, #{mod := Mod, state := ResourceState, status := started}} ->
%% the resource state is readonly to Module:on_query/4
%% and the `after_query()` functions should be thread safe
Mod:on_query(InstId, Request, AfterQuery, ResourceState);
{error, Reason} ->
@ -235,9 +216,29 @@ list_instances() ->
list_instances_verbose() ->
emqx_resource_instance:list_all().
-spec list_instances_by_type(module()) -> [resource_data()].
-spec list_instances_by_type(module()) -> [instance_id()].
list_instances_by_type(ResourceType) ->
emqx_resource_instance:lookup_by_type(ResourceType).
filter_instances(fun(_, RT) when RT =:= ResourceType -> true;
(_, _) -> false
end).
-spec generate_id(term()) -> instance_id().
generate_id(Name) when is_binary(Name) ->
generate_id(?DEFAULT_RESOURCE_GROUP, Name).
-spec generate_id(resource_group(), binary()) -> instance_id().
generate_id(Group, Name) when is_binary(Group) and is_binary(Name) ->
Id = integer_to_binary(erlang:unique_integer([positive])),
<<Group/binary, "/", Name/binary, ":", Id/binary>>.
-spec list_group_instances(resource_group()) -> [instance_id()].
list_group_instances(Group) ->
filter_instances(fun(Id, _) ->
case binary:split(Id, <<"/">>) of
[Group | _] -> true;
_ -> false
end
end).
-spec call_start(instance_id(), module(), resource_config()) ->
{ok, resource_state()} | {error, Reason :: term()}.
@ -286,7 +287,7 @@ check_config(ResourceType, RawConfigTerm) ->
end.
-spec check_and_create(instance_id(), resource_type(), raw_resource_config()) ->
{ok, resource_data() |'already_created'} | {error, term()}.
{ok, resource_data() | 'already_created'} | {error, term()}.
check_and_create(InstId, ResourceType, RawConfig) ->
check_and_do(ResourceType, RawConfig,
fun(InstConf) -> create(InstId, ResourceType, InstConf) end).
@ -317,16 +318,8 @@ check_and_do(ResourceType, RawConfig, Do) when is_function(Do) ->
%% =================================================================================
-spec resource_type_from_str(string()) -> {ok, resource_type()} | {error, term()}.
resource_type_from_str(ResourceType) ->
try Mod = list_to_existing_atom(str(ResourceType)),
case emqx_resource:is_resource_mod(Mod) of
true -> {ok, Mod};
false -> {error, {invalid_resource, Mod}}
end
catch error:badarg ->
{error, {resource_not_found, ResourceType}}
end.
filter_instances(Filter) ->
[Id || #{id := Id, mod := Mod} <- list_instances_verbose(), Filter(Id, Mod)].
call_instance(InstId, Query) ->
emqx_resource_instance:hash_call(InstId, Query).
@ -334,9 +327,6 @@ call_instance(InstId, Query) ->
safe_apply(Func, Args) ->
?SAFE_CALL(erlang:apply(Func, Args)).
str(S) when is_binary(S) -> binary_to_list(S);
str(S) when is_list(S) -> S.
cluster_call(Func, Args) ->
case emqx_cluster_rpc:multicall(?MODULE, Func, Args) of
{ok, _TxnId, Result} -> Result;

View File

@ -15,19 +15,9 @@
%%--------------------------------------------------------------------
-module(emqx_resource_api).
-export([ list_instances/1
, format_data/1
, stringnify/1
]).
-export([stringify/1]).
list_instances(Filter) ->
[format_data(Data) || Data <- emqx_resource:list_instances_verbose(), Filter(Data)].
format_data(#{id := Id, mod := Mod, status := Status, config := Config}) ->
#{id => Id, status => Status, resource_type => Mod,
config => emqx_resource:call_jsonify(Mod, Config)}.
stringnify(Bin) when is_binary(Bin) -> Bin;
stringnify(Str) when is_list(Str) -> list_to_binary(Str);
stringnify(Reason) ->
stringify(Bin) when is_binary(Bin) -> Bin;
stringify(Str) when is_list(Str) -> list_to_binary(Str);
stringify(Reason) ->
iolist_to_binary(io_lib:format("~p", [Reason])).

View File

@ -25,7 +25,6 @@
%% load resource instances from *.conf files
-export([ lookup/1
, list_all/0
, lookup_by_type/1
, create_local/3
]).
@ -75,12 +74,12 @@ force_lookup(InstId) ->
-spec list_all() -> [resource_data()].
list_all() ->
[Data#{id => Id} || {Id, Data} <- ets:tab2list(emqx_resource_instance)].
try
[Data#{id => Id} || {Id, Data} <- ets:tab2list(emqx_resource_instance)]
catch
error:badarg -> []
end.
-spec lookup_by_type(module()) -> [resource_data()].
lookup_by_type(ResourceType) ->
[Data || #{mod := Mod} = Data <- list_all()
, Mod =:= ResourceType].
-spec create_local(instance_id(), resource_type(), resource_config()) ->
{ok, resource_data()} | {error, term()}.
@ -141,7 +140,12 @@ code_change(_OldVsn, State, _Extra) ->
%%------------------------------------------------------------------------------
%% suppress the race condition check, as these functions are protected in gproc workers
-dialyzer({nowarn_function, [do_recreate/4, do_create/3, do_restart/1, do_stop/1, do_health_check/1]}).
-dialyzer({nowarn_function, [do_recreate/4,
do_create/3,
do_restart/1,
do_stop/1,
do_health_check/1]}).
do_recreate(InstId, ResourceType, NewConfig, Params) ->
case lookup(InstId) of
{ok, #{mod := ResourceType, state := ResourceState, config := OldConfig}} ->
@ -172,7 +176,8 @@ do_create(InstId, ResourceType, Config) ->
_ = do_health_check(InstId),
{ok, force_lookup(InstId)};
{error, Reason} ->
logger:error("start ~ts resource ~ts failed: ~p", [ResourceType, InstId, Reason]),
logger:error("start ~ts resource ~ts failed: ~p",
[ResourceType, InstId, Reason]),
{error, Reason}
end
end.
@ -209,9 +214,9 @@ do_restart(InstId) ->
{ok, #{mod := Mod, state := ResourceState, config := Config} = Data} ->
_ = emqx_resource:call_stop(InstId, Mod, ResourceState),
case emqx_resource:call_start(InstId, Mod, Config) of
{ok, ResourceState} ->
{ok, NewResourceState} ->
ets:insert(emqx_resource_instance,
{InstId, Data#{state => ResourceState, status => started}}),
{InstId, Data#{state => NewResourceState, status => started}}),
ok;
{error, Reason} ->
ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}),

View File

@ -1,70 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_resource_transform).
-include_lib("syntax_tools/include/merl.hrl").
-export([parse_transform/2]).
parse_transform(Forms, _Opts) ->
Mod = hd([M || {attribute, _, module, M} <- Forms]),
AST = trans(Mod, proplists:delete(eof, Forms)),
_ = debug_print(Mod, AST),
AST.
-ifdef(RESOURCE_DEBUG).
debug_print(Mod, Ts) ->
{ok, Io} = file:open("./" ++ atom_to_list(Mod) ++ ".trans.erl", [write]),
_ = do_debug_print(Io, Ts),
file:close(Io).
do_debug_print(Io, Ts) when is_list(Ts) ->
lists:foreach(fun(T) -> do_debug_print(Io, T) end, Ts);
do_debug_print(Io, T) ->
io:put_chars(Io, erl_prettypr:format(merl:tree(T))),
io:nl(Io).
-else.
debug_print(_Mod, _AST) ->
ok.
-endif.
trans(Mod, Forms) ->
forms(Mod, Forms) ++ [erl_syntax:revert(erl_syntax:eof_marker())].
forms(Mod, [F0 | Fs0]) ->
case form(Mod, F0) of
{CurrForms, AppendedForms} ->
CurrForms ++ forms(Mod, Fs0) ++ AppendedForms;
{CurrForms, FollowerForms, AppendedForms} ->
CurrForms ++ FollowerForms ++ forms(Mod, Fs0) ++ AppendedForms
end;
forms(_, []) -> [].
form(Mod, Form) ->
case Form of
?Q("-module('@_').") ->
{[Form], fix_spec_attrs(), fix_spec_funcs(Mod)};
_ ->
{[Form], [], []}
end.
fix_spec_attrs() ->
[ ?Q("-export([emqx_resource_schema/0]).")
].
fix_spec_funcs(_Mod) ->
[ ?Q("emqx_resource_schema() -> <<\"demo_swagger_schema\">>.")
].

View File

@ -1,16 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_resource_uitils).

View File

@ -0,0 +1,181 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_resource_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include("emqx_authn.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(TEST_RESOURCE, emqx_test_resource).
-define(ID, <<"id">>).
all() ->
emqx_common_test_helpers:all(?MODULE).
groups() ->
[].
init_per_testcase(_, Config) ->
Config.
init_per_suite(Config) ->
code:ensure_loaded(?TEST_RESOURCE),
ok = emqx_common_test_helpers:start_apps([]),
{ok, _} = application:ensure_all_started(emqx_resource),
Config.
end_per_suite(_Config) ->
ok = emqx_common_test_helpers:stop_apps([emqx_resource]).
%%------------------------------------------------------------------------------
%% Tests
%%------------------------------------------------------------------------------
t_list_types(_) ->
?assert(lists:member(?TEST_RESOURCE, emqx_resource:list_types())).
t_check_config(_) ->
{ok, #{}} = emqx_resource:check_config(?TEST_RESOURCE, bin_config()),
{ok, #{}} = emqx_resource:check_config(?TEST_RESOURCE, config()),
{error, _} = emqx_resource:check_config(?TEST_RESOURCE, <<"not a config">>),
{error, _} = emqx_resource:check_config(?TEST_RESOURCE, #{invalid => config}).
t_create_remove(_) ->
{error, _} = emqx_resource:check_and_create_local(
?ID,
?TEST_RESOURCE,
#{unknown => <<"test_resource">>}),
{ok, _} = emqx_resource:create_local(
?ID,
?TEST_RESOURCE,
#{name => <<"test_resource">>}),
#{pid := Pid} = emqx_resource:query(?ID, get_state),
?assert(is_process_alive(Pid)),
ok = emqx_resource:remove_local(?ID),
{error, _} = emqx_resource:remove_local(?ID),
?assertNot(is_process_alive(Pid)).
t_query(_) ->
{ok, _} = emqx_resource:create_local(
?ID,
?TEST_RESOURCE,
#{name => <<"test_resource">>}),
Pid = self(),
Success = fun() -> Pid ! success end,
Failure = fun() -> Pid ! failure end,
#{pid := _} = emqx_resource:query(?ID, get_state),
#{pid := _} = emqx_resource:query(?ID, get_state, {{Success, []}, {Failure, []}}),
receive
Message -> ?assertEqual(success, Message)
after 100 ->
?assert(false)
end,
?assertException(
error,
{get_instance, _Reason},
emqx_resource:query(<<"unknown">>, get_state)),
ok = emqx_resource:remove_local(?ID).
t_healthy(_) ->
{ok, _} = emqx_resource:create_local(
?ID,
?TEST_RESOURCE,
#{name => <<"test_resource">>}),
#{pid := Pid} = emqx_resource:query(?ID, get_state),
ok = emqx_resource:health_check(?ID),
[#{status := started}] = emqx_resource:list_instances_verbose(),
erlang:exit(Pid, shutdown),
{error, dead} = emqx_resource:health_check(?ID),
[#{status := stopped}] = emqx_resource:list_instances_verbose(),
ok = emqx_resource:remove_local(?ID).
t_stop_start(_) ->
{error, _} = emqx_resource:check_and_create_local(
?ID,
?TEST_RESOURCE,
#{unknown => <<"test_resource">>}),
{ok, _} = emqx_resource:create_local(
?ID,
?TEST_RESOURCE,
#{name => <<"test_resource">>}),
#{pid := Pid0} = emqx_resource:query(?ID, get_state),
?assert(is_process_alive(Pid0)),
ok = emqx_resource:stop(?ID),
?assertNot(is_process_alive(Pid0)),
?assertException(
error,
{?ID, stopped},
emqx_resource:query(?ID, get_state)),
ok = emqx_resource:restart(?ID),
#{pid := Pid1} = emqx_resource:query(?ID, get_state),
?assert(is_process_alive(Pid1)).
t_list_filter(_) ->
{ok, _} = emqx_resource:create_local(
emqx_resource:generate_id(<<"a">>),
?TEST_RESOURCE,
#{name => a}),
{ok, _} = emqx_resource:create_local(
emqx_resource:generate_id(<<"group">>, <<"a">>),
?TEST_RESOURCE,
#{name => grouped_a}),
[Id1] = emqx_resource:list_group_instances(<<"default">>),
{ok, #{config := #{name := a}}} = emqx_resource:get_instance(Id1),
[Id2] = emqx_resource:list_group_instances(<<"group">>),
{ok, #{config := #{name := grouped_a}}} = emqx_resource:get_instance(Id2).
%%------------------------------------------------------------------------------
%% Helpers
%%------------------------------------------------------------------------------
bin_config() ->
<<"\"name\": \"test_resource\"">>.
config() ->
{ok, Config} = hocon:binary(bin_config()),
Config.

View File

@ -0,0 +1,69 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_test_resource).
-include_lib("typerefl/include/types.hrl").
-behaviour(emqx_resource).
%% callbacks of behaviour emqx_resource
-export([ on_start/2
, on_stop/2
, on_query/4
, on_health_check/2
, on_config_merge/3
]).
%% callbacks for emqx_resource config schema
-export([roots/0]).
roots() -> [{"name", fun name/1}].
name(type) -> binary();
name(nullable) -> false;
name(_) -> undefined.
on_start(InstId, #{name := Name}) ->
{ok, #{name => Name,
id => InstId,
pid => spawn_dummy_process()}}.
on_stop(_InstId, #{pid := Pid}) ->
erlang:exit(Pid, shutdown),
ok.
on_query(_InstId, get_state, AfterQuery, State) ->
emqx_resource:query_success(AfterQuery),
State.
on_health_check(_InstId, State = #{pid := Pid}) ->
case is_process_alive(Pid) of
true -> {ok, State};
false -> {error, dead, State}
end.
on_config_merge(OldConfig, NewConfig, _Params) ->
maps:merge(OldConfig, NewConfig).
spawn_dummy_process() ->
spawn(
fun() ->
Ref = make_ref(),
receive
Ref -> ok
end
end).

View File

@ -18,7 +18,9 @@
{left, "|"},
{right, "||"},
{left, "||"}]}},
{elvis_style, dont_repeat_yourself, #{ min_complexity => 20 }}
{elvis_style, dont_repeat_yourself, #{ min_complexity => 20 }},
{elvis_style, god_modules, #{ignore => [emqx_authentication,
emqx_resource]}}
]
},
#{dirs => ["test", "apps/**/test"],