refactor(gw): integrate with authn

This commit is contained in:
JianBo He 2021-09-16 14:04:31 +08:00
parent f3c675b139
commit 3e033b419c
8 changed files with 159 additions and 100 deletions

View File

@ -30,8 +30,8 @@ gateway.stomp {
} }
authentication: { authentication: {
name = "authenticator1" mechanism = password-based
type = "password-based:built-in-database" backend = built-in-database
user_id_type = clientid user_id_type = clientid
} }
@ -45,6 +45,12 @@ gateway.stomp {
"allow all" "allow all"
] ]
authentication: {
mechanism = password-based
backend = built-in-database
user_id_type = username
}
## TCP options ## TCP options
## See ${example_common_tcp_options} for more information ## See ${example_common_tcp_options} for more information
tcp.active_n = 100 tcp.active_n = 100

View File

@ -19,8 +19,6 @@
-type gateway_name() :: atom(). -type gateway_name() :: atom().
-type listener() :: #{}.
%% @doc The Gateway defination %% @doc The Gateway defination
-type gateway() :: -type gateway() ::
#{ name := gateway_name() #{ name := gateway_name()

View File

@ -81,10 +81,13 @@
%% Frame Module %% Frame Module
frame_mod :: atom(), frame_mod :: atom(),
%% Channel Module %% Channel Module
chann_mod :: atom() chann_mod :: atom(),
%% Listener Tag
listener :: listener() | undefined
}). }).
-type(state() :: #state{}). -type listener() :: {GwName :: atom(), LisType :: atom(), LisName :: atom()}.
-type state() :: #state{}.
-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]). -define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]).
-define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). -define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
@ -279,7 +282,8 @@ init_state(WrappedSock, Peername, Options, FrameMod, ChannMod) ->
idle_timer = IdleTimer, idle_timer = IdleTimer,
oom_policy = OomPolicy, oom_policy = OomPolicy,
frame_mod = FrameMod, frame_mod = FrameMod,
chann_mod = ChannMod chann_mod = ChannMod,
listener = maps:get(listener, Options, undefined)
}. }.
run_loop(Parent, State = #state{socket = Socket, run_loop(Parent, State = #state{socket = Socket,

View File

@ -30,7 +30,7 @@
#{ %% Gateway Name #{ %% Gateway Name
gwname := gateway_name() gwname := gateway_name()
%% Autenticator %% Autenticator
, auth := emqx_authn:chain_id() | undefined , auth := emqx_authentication:chain_name() | undefined
%% The ConnectionManager PID %% The ConnectionManager PID
, cm := pid() , cm := pid()
}. }.
@ -66,12 +66,8 @@
| {error, any()}. | {error, any()}.
authenticate(_Ctx = #{auth := undefined}, ClientInfo) -> authenticate(_Ctx = #{auth := undefined}, ClientInfo) ->
{ok, mountpoint(ClientInfo)}; {ok, mountpoint(ClientInfo)};
authenticate(_Ctx = #{auth := ChainId}, ClientInfo0) -> authenticate(_Ctx = #{auth := _ChainName}, ClientInfo0) ->
ClientInfo = ClientInfo0#{ ClientInfo = ClientInfo0#{zone => default},
zone => default,
listener => {tcp, default},
chain_id => ChainId
},
case emqx_access_control:authenticate(ClientInfo) of case emqx_access_control:authenticate(ClientInfo) of
{ok, _} -> {ok, _} ->
{ok, mountpoint(ClientInfo)}; {ok, mountpoint(ClientInfo)};

View File

@ -43,6 +43,7 @@
name :: gateway_name(), name :: gateway_name(),
config :: emqx_config:config(), config :: emqx_config:config(),
ctx :: emqx_gateway_ctx:context(), ctx :: emqx_gateway_ctx:context(),
authns :: [emqx_authentication:chain_name()],
status :: stopped | running, status :: stopped | running,
child_pids :: [pid()], child_pids :: [pid()],
gw_state :: emqx_gateway_impl:state() | undefined, gw_state :: emqx_gateway_impl:state() | undefined,
@ -174,9 +175,9 @@ handle_info(Info, State) ->
?LOG(warning, "Unexcepted info: ~p", [Info]), ?LOG(warning, "Unexcepted info: ~p", [Info]),
{noreply, State}. {noreply, State}.
terminate(_Reason, State = #state{ctx = Ctx, child_pids = Pids}) -> terminate(_Reason, State = #state{child_pids = Pids}) ->
Pids /= [] andalso (_ = cb_gateway_unload(State)), Pids /= [] andalso (_ = cb_gateway_unload(State)),
_ = do_deinit_authn(maps:get(auth, Ctx, undefined)), _ = do_deinit_authn(State#state.authns),
ok. ok.
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
@ -197,52 +198,100 @@ detailed_gateway_info(State) ->
%% Internal funcs %% Internal funcs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
do_init_authn(GwName, Config) -> %% same with emqx_authentication:global_chain/1
case maps:get(authentication, Config, #{enable => false}) of global_chain(mqtt) ->
#{enable := false} -> undefined; 'mqtt:global';
AuthCfg when is_map(AuthCfg) -> global_chain('mqtt-sn') ->
case maps:get(enable, AuthCfg, true) of 'mqtt-sn:global';
false -> global_chain(coap) ->
undefined; 'coap:global';
_ -> global_chain(lwm2m) ->
%% TODO: Implement Authentication 'lwm2m:global';
GwName global_chain(stomp) ->
%case emqx_authn:create_chain(#{id => ChainId}) of 'stomp:global';
% {ok, _ChainInfo} -> global_chain(_) ->
% case emqx_authn:create_authenticator(ChainId, AuthCfg) of 'unknown:global'.
% {ok, _} -> ChainId;
% {error, Reason} -> listener_chain(GwName, Type, LisName) ->
% ?LOG(error, "Failed to create authentication ~p", [Reason]), emqx_gateway_utils:listener_id(GwName, Type, LisName).
% throw({bad_authentication, Reason})
% end; %% There are two layer authentication configs
% {error, Reason} -> %% stomp.authn
% ?LOG(error, "Failed to create authentication chain: ~p", [Reason]), %% / \
% throw({bad_chain, {ChainId, Reason}}) %% listeners.tcp.defautl.authn *.ssl.default.authn
%end. %%
end;
_ -> init_authn(GwName, Config) ->
undefined Authns = authns(GwName, Config),
try
do_init_authn(Authns, [])
catch
throw : Reason = {badauth, _} ->
do_deinit_authn(proplists:get_keys(Authns)),
throw(Reason)
end. end.
do_deinit_authn(undefined) -> do_init_authn([], Names) ->
ok; Names;
do_deinit_authn(AuthnRef) -> do_init_authn([{_ChainName, _AuthConf = #{enable := false}}|More], Names) ->
%% TODO: do_init_authn(More, Names);
?LOG(warning, "Failed to clean authn ~p, not suppported now", [AuthnRef]). do_init_authn([{ChainName, AuthConf}|More], Names) ->
%case emqx_authn:delete_chain(AuthnRef) of _ = application:ensure_all_started(emqx_authn),
% ok -> ok; do_create_authn_chain(ChainName, AuthConf),
% {error, {not_found, _}} -> do_init_authn(More, [ChainName|Names]).
% ?LOG(warning, "Failed to clean authentication chain: ~s, "
% "reason: not_found", [AuthnRef]); authns(GwName, Config) ->
% {error, Reason} -> Listeners = maps:to_list(maps:get(listeners, Config, #{})),
% ?LOG(error, "Failed to clean authentication chain: ~s, " lists:append(
% "reason: ~p", [AuthnRef, Reason]) [ [{listener_chain(GwName, LisType, LisName), authn_conf(Opts)}
%end. || {LisName, Opts} <- maps:to_list(LisNames) ]
|| {LisType, LisNames} <- Listeners])
++ [{global_chain(GwName), authn_conf(Config)}].
authn_conf(Conf) ->
maps:get(authentication, Conf, #{enable => false}).
do_create_authn_chain(ChainName, AuthConf) ->
case ensure_chain(ChainName) of
ok ->
case emqx_authentication:create_authenticator(ChainName, AuthConf) of
{ok, _} -> ok;
{error, Reason} ->
?LOG(error, "Failed to create authenticator chain ~s, "
"reason: ~p, config: ~p",
[ChainName, Reason, AuthConf]),
throw({badauth, Reason})
end;
{error, Reason} ->
?LOG(error, "Falied to create authn chain ~s, reason ~p",
[ChainName, Reason]),
throw({badauth, Reason})
end.
ensure_chain(ChainName) ->
case emqx_authentication:create_chain(ChainName) of
{ok, _ChainInfo} ->
ok;
{error, {already_exists, _}} ->
ok;
{error, Reason} ->
{error, Reason}
end.
do_deinit_authn(Names) ->
lists:foreach(fun(ChainName) ->
case emqx_authentication:delete_chain(ChainName) of
ok -> ok;
{error, {not_found, _}} -> ok;
{error, Reason} ->
?LOG(error, "Failed to clean authentication chain: ~s, "
"reason: ~p", [ChainName, Reason])
end
end, Names).
do_update_one_by_one(NCfg0, State = #state{ do_update_one_by_one(NCfg0, State = #state{
ctx = Ctx, config = OCfg,
config = OCfg, status = Status}) ->
status = Status}) ->
NCfg = emqx_map_lib:deep_merge(OCfg, NCfg0), NCfg = emqx_map_lib:deep_merge(OCfg, NCfg0),
@ -263,14 +312,9 @@ do_update_one_by_one(NCfg0, State = #state{
true -> State; true -> State;
false -> false ->
%% Reset Authentication first %% Reset Authentication first
_ = do_deinit_authn(maps:get(auth, Ctx, undefined)), _ = do_deinit_authn(State#state.authns),
NCtx = Ctx#{ AuthnNames = init_authn(State#state.name, NCfg),
auth => do_init_authn( State#state{authns = AuthnNames}
State#state.name,
NCfg
)
},
State#state{ctx = NCtx}
end, end,
cb_gateway_update(NCfg, NState); cb_gateway_update(NCfg, NState);
Status == running, NEnable == false -> Status == running, NEnable == false ->
@ -289,6 +333,7 @@ cb_gateway_unload(State = #state{name = GwName,
#{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwName), #{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwName),
CbMod:on_gateway_unload(Gateway, GwState), CbMod:on_gateway_unload(Gateway, GwState),
{ok, State#state{child_pids = [], {ok, State#state{child_pids = [],
authns = [],
status = stopped, status = stopped,
gw_state = undefined, gw_state = undefined,
started_at = undefined, started_at = undefined,
@ -300,6 +345,8 @@ cb_gateway_unload(State = #state{name = GwName,
[GwName, GwState, [GwName, GwState,
Class, Reason, Stk]), Class, Reason, Stk]),
{error, {Class, Reason, Stk}} {error, {Class, Reason, Stk}}
after
_ = do_deinit_authn(State#state.authns)
end. end.
%% @doc 1. Create Authentcation Context %% @doc 1. Create Authentcation Context
@ -317,17 +364,18 @@ cb_gateway_load(State = #state{name = GwName,
?LOG(info, "Skipp to start ~s gateway due to disabled", [GwName]); ?LOG(info, "Skipp to start ~s gateway due to disabled", [GwName]);
true -> true ->
try try
AuthnRef = do_init_authn(GwName, Config), AuthnNames = init_authn(GwName, Config),
NCtx = Ctx#{auth => AuthnRef}, NCtx = Ctx#{auth => AuthnNames},
#{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwName), #{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwName),
case CbMod:on_gateway_load(Gateway, NCtx) of case CbMod:on_gateway_load(Gateway, NCtx) of
{error, Reason} -> {error, Reason} ->
do_deinit_authn(AuthnRef), do_deinit_authn(AuthnNames),
throw({callback_return_error, Reason}); throw({callback_return_error, Reason});
{ok, ChildPidOrSpecs, GwState} -> {ok, ChildPidOrSpecs, GwState} ->
ChildPids = start_child_process(ChildPidOrSpecs), ChildPids = start_child_process(ChildPidOrSpecs),
{ok, State#state{ {ok, State#state{
ctx = NCtx, ctx = NCtx,
authns = AuthnNames,
status = running, status = running,
child_pids = ChildPids, child_pids = ChildPids,
gw_state = GwState, gw_state = GwState,

View File

@ -92,10 +92,10 @@ fields(coap) ->
fields(lwm2m) -> fields(lwm2m) ->
[ {xml_dir, sc(binary())} [ {xml_dir, sc(binary())}
, {lifetime_min, sc(duration())} , {lifetime_min, sc(duration(), "1s")}
, {lifetime_max, sc(duration())} , {lifetime_max, sc(duration(), "86400s")}
, {qmode_time_windonw, sc(integer())} , {qmode_time_window, sc(integer(), 22)}
, {auto_observe, sc(boolean())} , {auto_observe, sc(boolean(), false)}
, {update_msg_publish_condition, sc(hoconsc:union([always, contains_object_list]))} , {update_msg_publish_condition, sc(hoconsc:union([always, contains_object_list]))}
, {translators, sc(ref(translators))} , {translators, sc(ref(translators))}
, {listeners, sc(ref(udp_listeners))} , {listeners, sc(ref(udp_listeners))}
@ -154,8 +154,8 @@ fields(udp_tcp_listeners) ->
]; ];
fields(tcp_listener) -> fields(tcp_listener) ->
[ [ %% some special confs for tcp listener
%% some special confs for tcp listener {acceptors, sc(integer(), 16)}
] ++ ] ++
tcp_opts() ++ tcp_opts() ++
proxy_protocol_opts() ++ proxy_protocol_opts() ++
@ -175,6 +175,8 @@ fields(udp_listener) ->
common_listener_opts(); common_listener_opts();
fields(dtls_listener) -> fields(dtls_listener) ->
[ {acceptors, sc(integer(), 16)}
] ++
fields(udp_listener) ++ fields(udp_listener) ++
[{dtls, sc_meta(ref(dtls_opts), [{dtls, sc_meta(ref(dtls_opts),
#{desc => "DTLS listener options"})}]; #{desc => "DTLS listener options"})}];
@ -195,25 +197,25 @@ fields(dtls_opts) ->
, ciphers => dtls , ciphers => dtls
}, false). }, false).
% authentication() -> authentication() ->
% hoconsc:union( hoconsc:union(
% [ undefined [ undefined
% , hoconsc:ref(emqx_authn_mnesia, config) , hoconsc:ref(emqx_authn_mnesia, config)
% , hoconsc:ref(emqx_authn_mysql, config) , hoconsc:ref(emqx_authn_mysql, config)
% , hoconsc:ref(emqx_authn_pgsql, config) , hoconsc:ref(emqx_authn_pgsql, config)
% , hoconsc:ref(emqx_authn_mongodb, standalone) , hoconsc:ref(emqx_authn_mongodb, standalone)
% , hoconsc:ref(emqx_authn_mongodb, 'replica-set') , hoconsc:ref(emqx_authn_mongodb, 'replica-set')
% , hoconsc:ref(emqx_authn_mongodb, 'sharded-cluster') , hoconsc:ref(emqx_authn_mongodb, 'sharded-cluster')
% , hoconsc:ref(emqx_authn_redis, standalone) , hoconsc:ref(emqx_authn_redis, standalone)
% , hoconsc:ref(emqx_authn_redis, cluster) , hoconsc:ref(emqx_authn_redis, cluster)
% , hoconsc:ref(emqx_authn_redis, sentinel) , hoconsc:ref(emqx_authn_redis, sentinel)
% , hoconsc:ref(emqx_authn_http, get) , hoconsc:ref(emqx_authn_http, get)
% , hoconsc:ref(emqx_authn_http, post) , hoconsc:ref(emqx_authn_http, post)
% , hoconsc:ref(emqx_authn_jwt, 'hmac-based') , hoconsc:ref(emqx_authn_jwt, 'hmac-based')
% , hoconsc:ref(emqx_authn_jwt, 'public-key') , hoconsc:ref(emqx_authn_jwt, 'public-key')
% , hoconsc:ref(emqx_authn_jwt, 'jwks') , hoconsc:ref(emqx_authn_jwt, 'jwks')
% , hoconsc:ref(emqx_enhanced_authn_scram_mnesia, config) , hoconsc:ref(emqx_enhanced_authn_scram_mnesia, config)
% ]). ]).
gateway_common_options() -> gateway_common_options() ->
[ {enable, sc(boolean(), true)} [ {enable, sc(boolean(), true)}
@ -221,16 +223,15 @@ gateway_common_options() ->
, {idle_timeout, sc(duration(), <<"30s">>)} , {idle_timeout, sc(duration(), <<"30s">>)}
, {mountpoint, sc(binary(), <<>>)} , {mountpoint, sc(binary(), <<>>)}
, {clientinfo_override, sc(ref(clientinfo_override))} , {clientinfo_override, sc(ref(clientinfo_override))}
, {authentication, sc(hoconsc:lazy(map()))} , {authentication, authentication()}
]. ].
common_listener_opts() -> common_listener_opts() ->
[ {enable, sc(boolean(), true)} [ {enable, sc(boolean(), true)}
, {bind, sc(union(ip_port(), integer()))} , {bind, sc(union(ip_port(), integer()))}
, {acceptors, sc(integer(), 16)}
, {max_connections, sc(integer(), 1024)} , {max_connections, sc(integer(), 1024)}
, {max_conn_rate, sc(integer())} , {max_conn_rate, sc(integer())}
%, {rate_limit, sc(comma_separated_list())} , {authentication, authentication()}
, {mountpoint, sc(binary(), undefined)} , {mountpoint, sc(binary(), undefined)}
, {access_rules, sc(hoconsc:array(string()), [])} , {access_rules, sc(hoconsc:array(string()), [])}
]. ].
@ -242,8 +243,8 @@ udp_opts() ->
[{udp, sc_meta(ref(udp_opts), #{})}]. [{udp, sc_meta(ref(udp_opts), #{})}].
proxy_protocol_opts() -> proxy_protocol_opts() ->
[ {proxy_protocol, sc(boolean())} [ {proxy_protocol, sc(boolean(), false)}
, {proxy_protocol_timeout, sc(duration())} , {proxy_protocol_timeout, sc(duration(), "15s")}
]. ].
sc(Type) -> sc(Type) ->

View File

@ -109,10 +109,15 @@ init(ConnInfo = #{peername := {PeerHost, _},
sockname := {_, SockPort}}, Option) -> sockname := {_, SockPort}}, Option) ->
Peercert = maps:get(peercert, ConnInfo, undefined), Peercert = maps:get(peercert, ConnInfo, undefined),
Mountpoint = maps:get(mountpoint, Option, undefined), Mountpoint = maps:get(mountpoint, Option, undefined),
ListenerId = case maps:get(listener, Option, undefined) of
undefined -> undefined;
{GwName, Type, LisName} ->
emqx_gateway_utils:listener_id(GwName, Type, LisName)
end,
ClientInfo = setting_peercert_infos( ClientInfo = setting_peercert_infos(
Peercert, Peercert,
#{ zone => default #{ zone => default
, listener => {tcp, default} , listener => ListenerId
, protocol => stomp , protocol => stomp
, peerhost => PeerHost , peerhost => PeerHost
, sockport => SockPort , sockport => SockPort

View File

@ -106,6 +106,7 @@ start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) ->
Name = emqx_gateway_utils:listener_id(GwName, Type, LisName), Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
NCfg = Cfg#{ NCfg = Cfg#{
ctx => Ctx, ctx => Ctx,
listener => {GwName, Type, LisName}, %% Used for authn
frame_mod => emqx_stomp_frame, frame_mod => emqx_stomp_frame,
chann_mod => emqx_stomp_channel chann_mod => emqx_stomp_channel
}, },