chore(gw): integrate with emqx-authn
This commit is contained in:
parent
cf20fc6db7
commit
f36c523ae8
|
@ -28,6 +28,10 @@
|
|||
-export([ authenticator_name/1
|
||||
]).
|
||||
|
||||
%% Export it for emqx_gateway_schema module
|
||||
-export([ authenticators/1
|
||||
]).
|
||||
|
||||
structs() -> [ "authentication" ].
|
||||
|
||||
fields("authentication") ->
|
||||
|
|
|
@ -16,7 +16,14 @@ gateway: {
|
|||
password: "${Packet.headers.passcode}"
|
||||
}
|
||||
|
||||
authenticator: allow_anonymous
|
||||
authenticators: [
|
||||
{
|
||||
name: "authenticator1"
|
||||
mechanism: password-based
|
||||
server_type: built-in-database
|
||||
user_id_type: clientid
|
||||
}
|
||||
]
|
||||
|
||||
listener.tcp.1: {
|
||||
bind: 61613
|
||||
|
@ -29,7 +36,7 @@ gateway: {
|
|||
|
||||
coap.1: {
|
||||
enable_stats: false
|
||||
authenticator: allow_anonymous
|
||||
authenticators: []
|
||||
heartbeat: 30s
|
||||
resource: mqtt
|
||||
notify_type: qos
|
||||
|
@ -42,7 +49,7 @@ gateway: {
|
|||
|
||||
coap.2: {
|
||||
enable_stats: false
|
||||
authenticator: allow_anonymous
|
||||
authenticators: []
|
||||
heartbeat: 30s
|
||||
resource: pubsub
|
||||
notify_type: non
|
||||
|
@ -114,7 +121,7 @@ gateway: {
|
|||
#ssl.cacertfile:
|
||||
}
|
||||
|
||||
authenticator: allow_anonymous
|
||||
authenticators: []
|
||||
|
||||
listener.tcp.1: {
|
||||
bind: 7993
|
||||
|
|
|
@ -32,7 +32,7 @@
|
|||
%% Gateway ID
|
||||
, type := gateway_type()
|
||||
%% Autenticator
|
||||
, auth := emqx_authn:chain_id()
|
||||
, auth := emqx_authn:chain_id() | undefined
|
||||
%% The ConnectionManager PID
|
||||
, cm := pid()
|
||||
}.
|
||||
|
@ -65,6 +65,8 @@
|
|||
-spec authenticate(context(), emqx_types:clientinfo())
|
||||
-> {ok, emqx_types:clientinfo()}
|
||||
| {error, any()}.
|
||||
authenticate(_Ctx = #{auth := undefined}, ClientInfo) ->
|
||||
{ok, mountpoint(ClientInfo)};
|
||||
authenticate(_Ctx = #{auth := ChainId}, ClientInfo0) ->
|
||||
ClientInfo = ClientInfo0#{
|
||||
zone => default,
|
||||
|
@ -78,7 +80,7 @@ authenticate(_Ctx = #{auth := ChainId}, ClientInfo0) ->
|
|||
{error, Reason}
|
||||
end;
|
||||
authenticate(_Ctx, ClientInfo) ->
|
||||
{ok, ClientInfo}.
|
||||
{ok, mountpoint(ClientInfo)}.
|
||||
|
||||
%% @doc Register the session to the cluster.
|
||||
%%
|
||||
|
|
|
@ -86,8 +86,8 @@ call(Pid, Req) ->
|
|||
|
||||
init([Insta, Ctx0, _GwDscrptr]) ->
|
||||
process_flag(trap_exit, true),
|
||||
#{rawconf := RawConf} = Insta,
|
||||
Ctx = do_init_context(RawConf, Ctx0),
|
||||
#{id := InstaId, rawconf := RawConf} = Insta,
|
||||
Ctx = do_init_context(InstaId, RawConf, Ctx0),
|
||||
State = #state{
|
||||
insta = Insta,
|
||||
ctx = Ctx,
|
||||
|
@ -103,16 +103,16 @@ init([Insta, Ctx0, _GwDscrptr]) ->
|
|||
{ok, NState}
|
||||
end.
|
||||
|
||||
do_init_context(RawConf, Ctx) ->
|
||||
Auth = case maps:get(authenticator, RawConf, allow_anonymous) of
|
||||
allow_anonymous -> allow_anonymous;
|
||||
Funcs when is_list(Funcs) ->
|
||||
create_authenticator_for_gateway_insta(Funcs)
|
||||
do_init_context(InstaId, RawConf, Ctx) ->
|
||||
Auth = case maps:get(authenticators, RawConf, []) of
|
||||
[] -> undefined;
|
||||
AuthCfgs when is_list(AuthCfgs) ->
|
||||
create_authenticators_for_gateway_insta(InstaId, AuthCfgs)
|
||||
end,
|
||||
Ctx#{auth => Auth}.
|
||||
|
||||
do_deinit_context(Ctx) ->
|
||||
cleanup_authenticator_for_gateway_insta(maps:get(auth, Ctx)),
|
||||
cleanup_authenticators_for_gateway_insta(maps:get(auth, Ctx)),
|
||||
ok.
|
||||
|
||||
handle_call(info, _From, State = #state{insta = Insta}) ->
|
||||
|
@ -213,13 +213,42 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
%% Internal funcs
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
create_authenticator_for_gateway_insta(_Funcs) ->
|
||||
todo.
|
||||
%% @doc AuthCfgs is a array of authenticatior configurations,
|
||||
%% see: emqx_authn_schema:authenticators/1
|
||||
create_authenticators_for_gateway_insta(InstaId0, AuthCfgs) ->
|
||||
ChainId = atom_to_binary(InstaId0, utf8),
|
||||
case emqx_authn:create_chain(#{id => ChainId}) of
|
||||
{ok, _ChainInfo} ->
|
||||
Results = lists:map(fun(AuthCfg = #{name := Name}) ->
|
||||
case emqx_authn:create_authenticator(
|
||||
ChainId,
|
||||
AuthCfg) of
|
||||
{ok, _AuthInfo} -> ok;
|
||||
{error, Reason} -> {Name, Reason}
|
||||
end
|
||||
end, AuthCfgs),
|
||||
NResults = [ E || E <- Results, E /= ok],
|
||||
NResults /= [] andalso begin
|
||||
logger:error("Failed to create authenticators: ~p", [NResults]),
|
||||
throw({bad_autheticators, NResults})
|
||||
end, ok;
|
||||
{error, Reason} ->
|
||||
logger:error("Failed to create authenticator chain: ~p", [Reason]),
|
||||
throw({bad_chain, {ChainId, Reason}})
|
||||
end.
|
||||
|
||||
cleanup_authenticator_for_gateway_insta(allow_anonymouse) ->
|
||||
cleanup_authenticators_for_gateway_insta(undefined) ->
|
||||
ok;
|
||||
cleanup_authenticator_for_gateway_insta(_ChainId) ->
|
||||
todo.
|
||||
cleanup_authenticators_for_gateway_insta(ChainId) ->
|
||||
case emqx_authn:delete_chain(ChainId) of
|
||||
ok -> ok;
|
||||
{error, {not_found, _}} ->
|
||||
logger:warning("Failed clean authenticator chain: ~s, "
|
||||
"reason: not_found", [ChainId]);
|
||||
{error, Reason} ->
|
||||
logger:error("Failed clean authenticator chain: ~s, "
|
||||
"reason: ~p", [ChainId, Reason])
|
||||
end.
|
||||
|
||||
cb_insta_destroy(State = #state{insta = Insta = #{type := Type},
|
||||
insta_state = InstaState}) ->
|
||||
|
|
|
@ -46,7 +46,7 @@ fields(stomp) ->
|
|||
fields(stomp_structs) ->
|
||||
[ {frame, t(ref(stomp_frame))}
|
||||
, {clientinfo_override, t(ref(clientinfo_override))}
|
||||
, {authenticator, t(union([allow_anonymous]))}
|
||||
, {authenticators, fun emqx_authn_schema:authenticators/1}
|
||||
, {listener, t(ref(tcp_listener_group))}
|
||||
];
|
||||
|
||||
|
@ -97,7 +97,7 @@ fields(exproto) ->
|
|||
fields(exproto_structs) ->
|
||||
[ {server, t(ref(exproto_grpc_server))}
|
||||
, {handler, t(ref(exproto_grpc_handler))}
|
||||
, {authenticator, t(union([allow_anonymous]))}
|
||||
, {authenticators, fun emqx_authn_schema:authenticators/1}
|
||||
, {listener, t(ref(udp_tcp_listener_group))}
|
||||
];
|
||||
|
||||
|
@ -209,7 +209,7 @@ fields(coap) ->
|
|||
|
||||
fields(coap_structs) ->
|
||||
[ {enable_stats, t(boolean(), undefined, true)}
|
||||
, {authenticator, t(union([allow_anonymous]))}
|
||||
, {authenticators, fun emqx_authn_schema:authenticators/1}
|
||||
, {heartbeat, t(duration(), undefined, "15s")}
|
||||
, {resource, t(union([mqtt, pubsub]), undefined, mqtt)}
|
||||
, {notify_type, t(union([non, con, qos]), undefined, qos)}
|
||||
|
|
|
@ -109,7 +109,7 @@ format_listenon({Addr, Port}) when is_tuple(Addr) ->
|
|||
|
||||
-type rawconf() ::
|
||||
#{ clientinfo_override => #{}
|
||||
, authenticators := #{}
|
||||
, authenticators := list()
|
||||
, listeners => listener()
|
||||
, atom() => any()
|
||||
}.
|
||||
|
|
Loading…
Reference in New Issue