diff --git a/apps/emqx_gateway/etc/emqx_gateway.conf b/apps/emqx_gateway/etc/emqx_gateway.conf index 2e5797f50..38fc7987d 100644 --- a/apps/emqx_gateway/etc/emqx_gateway.conf +++ b/apps/emqx_gateway/etc/emqx_gateway.conf @@ -30,8 +30,8 @@ gateway.stomp { } authentication: { - name = "authenticator1" - type = "password-based:built-in-database" + mechanism = password-based + backend = built-in-database user_id_type = clientid } @@ -45,6 +45,12 @@ gateway.stomp { "allow all" ] + authentication: { + mechanism = password-based + backend = built-in-database + user_id_type = username + } + ## TCP options ## See ${example_common_tcp_options} for more information tcp.active_n = 100 diff --git a/apps/emqx_gateway/include/emqx_gateway.hrl b/apps/emqx_gateway/include/emqx_gateway.hrl index 5c0893cb2..8b2081a90 100644 --- a/apps/emqx_gateway/include/emqx_gateway.hrl +++ b/apps/emqx_gateway/include/emqx_gateway.hrl @@ -19,8 +19,6 @@ -type gateway_name() :: atom(). --type listener() :: #{}. - %% @doc The Gateway defination -type gateway() :: #{ name := gateway_name() diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl index 543b2e169..af34a1754 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl @@ -81,10 +81,13 @@ %% Frame Module frame_mod :: atom(), %% Channel Module - chann_mod :: atom() + chann_mod :: atom(), + %% Listener Tag + listener :: listener() | undefined }). --type(state() :: #state{}). +-type listener() :: {GwName :: atom(), LisType :: atom(), LisName :: atom()}. +-type state() :: #state{}. -define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]). -define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). @@ -279,7 +282,8 @@ init_state(WrappedSock, Peername, Options, FrameMod, ChannMod) -> idle_timer = IdleTimer, oom_policy = OomPolicy, frame_mod = FrameMod, - chann_mod = ChannMod + chann_mod = ChannMod, + listener = maps:get(listener, Options, undefined) }. run_loop(Parent, State = #state{socket = Socket, diff --git a/apps/emqx_gateway/src/emqx_gateway_ctx.erl b/apps/emqx_gateway/src/emqx_gateway_ctx.erl index 8022c3797..a790645c5 100644 --- a/apps/emqx_gateway/src/emqx_gateway_ctx.erl +++ b/apps/emqx_gateway/src/emqx_gateway_ctx.erl @@ -30,7 +30,7 @@ #{ %% Gateway Name gwname := gateway_name() %% Autenticator - , auth := emqx_authn:chain_id() | undefined + , auth := emqx_authentication:chain_name() | undefined %% The ConnectionManager PID , cm := pid() }. @@ -66,12 +66,8 @@ | {error, any()}. authenticate(_Ctx = #{auth := undefined}, ClientInfo) -> {ok, mountpoint(ClientInfo)}; -authenticate(_Ctx = #{auth := ChainId}, ClientInfo0) -> - ClientInfo = ClientInfo0#{ - zone => default, - listener => {tcp, default}, - chain_id => ChainId - }, +authenticate(_Ctx = #{auth := _ChainName}, ClientInfo0) -> + ClientInfo = ClientInfo0#{zone => default}, case emqx_access_control:authenticate(ClientInfo) of {ok, _} -> {ok, mountpoint(ClientInfo)}; diff --git a/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl b/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl index 39115f114..238bcaa1f 100644 --- a/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl +++ b/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl @@ -43,6 +43,7 @@ name :: gateway_name(), config :: emqx_config:config(), ctx :: emqx_gateway_ctx:context(), + authns :: [emqx_authentication:chain_name()], status :: stopped | running, child_pids :: [pid()], gw_state :: emqx_gateway_impl:state() | undefined, @@ -174,9 +175,9 @@ handle_info(Info, State) -> ?LOG(warning, "Unexcepted info: ~p", [Info]), {noreply, State}. -terminate(_Reason, State = #state{ctx = Ctx, child_pids = Pids}) -> +terminate(_Reason, State = #state{child_pids = Pids}) -> Pids /= [] andalso (_ = cb_gateway_unload(State)), - _ = do_deinit_authn(maps:get(auth, Ctx, undefined)), + _ = do_deinit_authn(State#state.authns), ok. code_change(_OldVsn, State, _Extra) -> @@ -197,52 +198,100 @@ detailed_gateway_info(State) -> %% Internal funcs %%-------------------------------------------------------------------- -do_init_authn(GwName, Config) -> - case maps:get(authentication, Config, #{enable => false}) of - #{enable := false} -> undefined; - AuthCfg when is_map(AuthCfg) -> - case maps:get(enable, AuthCfg, true) of - false -> - undefined; - _ -> - %% TODO: Implement Authentication - GwName - %case emqx_authn:create_chain(#{id => ChainId}) of - % {ok, _ChainInfo} -> - % case emqx_authn:create_authenticator(ChainId, AuthCfg) of - % {ok, _} -> ChainId; - % {error, Reason} -> - % ?LOG(error, "Failed to create authentication ~p", [Reason]), - % throw({bad_authentication, Reason}) - % end; - % {error, Reason} -> - % ?LOG(error, "Failed to create authentication chain: ~p", [Reason]), - % throw({bad_chain, {ChainId, Reason}}) - %end. - end; - _ -> - undefined +%% same with emqx_authentication:global_chain/1 +global_chain(mqtt) -> + 'mqtt:global'; +global_chain('mqtt-sn') -> + 'mqtt-sn:global'; +global_chain(coap) -> + 'coap:global'; +global_chain(lwm2m) -> + 'lwm2m:global'; +global_chain(stomp) -> + 'stomp:global'; +global_chain(_) -> + 'unknown:global'. + +listener_chain(GwName, Type, LisName) -> + emqx_gateway_utils:listener_id(GwName, Type, LisName). + +%% There are two layer authentication configs +%% stomp.authn +%% / \ +%% listeners.tcp.defautl.authn *.ssl.default.authn +%% + +init_authn(GwName, Config) -> + Authns = authns(GwName, Config), + try + do_init_authn(Authns, []) + catch + throw : Reason = {badauth, _} -> + do_deinit_authn(proplists:get_keys(Authns)), + throw(Reason) end. -do_deinit_authn(undefined) -> - ok; -do_deinit_authn(AuthnRef) -> - %% TODO: - ?LOG(warning, "Failed to clean authn ~p, not suppported now", [AuthnRef]). - %case emqx_authn:delete_chain(AuthnRef) of - % ok -> ok; - % {error, {not_found, _}} -> - % ?LOG(warning, "Failed to clean authentication chain: ~s, " - % "reason: not_found", [AuthnRef]); - % {error, Reason} -> - % ?LOG(error, "Failed to clean authentication chain: ~s, " - % "reason: ~p", [AuthnRef, Reason]) - %end. +do_init_authn([], Names) -> + Names; +do_init_authn([{_ChainName, _AuthConf = #{enable := false}}|More], Names) -> + do_init_authn(More, Names); +do_init_authn([{ChainName, AuthConf}|More], Names) -> + _ = application:ensure_all_started(emqx_authn), + do_create_authn_chain(ChainName, AuthConf), + do_init_authn(More, [ChainName|Names]). + +authns(GwName, Config) -> + Listeners = maps:to_list(maps:get(listeners, Config, #{})), + lists:append( + [ [{listener_chain(GwName, LisType, LisName), authn_conf(Opts)} + || {LisName, Opts} <- maps:to_list(LisNames) ] + || {LisType, LisNames} <- Listeners]) + ++ [{global_chain(GwName), authn_conf(Config)}]. + +authn_conf(Conf) -> + maps:get(authentication, Conf, #{enable => false}). + +do_create_authn_chain(ChainName, AuthConf) -> + case ensure_chain(ChainName) of + ok -> + case emqx_authentication:create_authenticator(ChainName, AuthConf) of + {ok, _} -> ok; + {error, Reason} -> + ?LOG(error, "Failed to create authenticator chain ~s, " + "reason: ~p, config: ~p", + [ChainName, Reason, AuthConf]), + throw({badauth, Reason}) + end; + {error, Reason} -> + ?LOG(error, "Falied to create authn chain ~s, reason ~p", + [ChainName, Reason]), + throw({badauth, Reason}) + end. + +ensure_chain(ChainName) -> + case emqx_authentication:create_chain(ChainName) of + {ok, _ChainInfo} -> + ok; + {error, {already_exists, _}} -> + ok; + {error, Reason} -> + {error, Reason} + end. + +do_deinit_authn(Names) -> + lists:foreach(fun(ChainName) -> + case emqx_authentication:delete_chain(ChainName) of + ok -> ok; + {error, {not_found, _}} -> ok; + {error, Reason} -> + ?LOG(error, "Failed to clean authentication chain: ~s, " + "reason: ~p", [ChainName, Reason]) + end + end, Names). do_update_one_by_one(NCfg0, State = #state{ - ctx = Ctx, - config = OCfg, - status = Status}) -> + config = OCfg, + status = Status}) -> NCfg = emqx_map_lib:deep_merge(OCfg, NCfg0), @@ -263,14 +312,9 @@ do_update_one_by_one(NCfg0, State = #state{ true -> State; false -> %% Reset Authentication first - _ = do_deinit_authn(maps:get(auth, Ctx, undefined)), - NCtx = Ctx#{ - auth => do_init_authn( - State#state.name, - NCfg - ) - }, - State#state{ctx = NCtx} + _ = do_deinit_authn(State#state.authns), + AuthnNames = init_authn(State#state.name, NCfg), + State#state{authns = AuthnNames} end, cb_gateway_update(NCfg, NState); Status == running, NEnable == false -> @@ -289,6 +333,7 @@ cb_gateway_unload(State = #state{name = GwName, #{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwName), CbMod:on_gateway_unload(Gateway, GwState), {ok, State#state{child_pids = [], + authns = [], status = stopped, gw_state = undefined, started_at = undefined, @@ -300,6 +345,8 @@ cb_gateway_unload(State = #state{name = GwName, [GwName, GwState, Class, Reason, Stk]), {error, {Class, Reason, Stk}} + after + _ = do_deinit_authn(State#state.authns) end. %% @doc 1. Create Authentcation Context @@ -317,17 +364,18 @@ cb_gateway_load(State = #state{name = GwName, ?LOG(info, "Skipp to start ~s gateway due to disabled", [GwName]); true -> try - AuthnRef = do_init_authn(GwName, Config), - NCtx = Ctx#{auth => AuthnRef}, + AuthnNames = init_authn(GwName, Config), + NCtx = Ctx#{auth => AuthnNames}, #{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwName), case CbMod:on_gateway_load(Gateway, NCtx) of {error, Reason} -> - do_deinit_authn(AuthnRef), + do_deinit_authn(AuthnNames), throw({callback_return_error, Reason}); {ok, ChildPidOrSpecs, GwState} -> ChildPids = start_child_process(ChildPidOrSpecs), {ok, State#state{ ctx = NCtx, + authns = AuthnNames, status = running, child_pids = ChildPids, gw_state = GwState, diff --git a/apps/emqx_gateway/src/emqx_gateway_schema.erl b/apps/emqx_gateway/src/emqx_gateway_schema.erl index abef053cb..6c2cd5a5d 100644 --- a/apps/emqx_gateway/src/emqx_gateway_schema.erl +++ b/apps/emqx_gateway/src/emqx_gateway_schema.erl @@ -92,10 +92,10 @@ fields(coap) -> fields(lwm2m) -> [ {xml_dir, sc(binary())} - , {lifetime_min, sc(duration())} - , {lifetime_max, sc(duration())} - , {qmode_time_windonw, sc(integer())} - , {auto_observe, sc(boolean())} + , {lifetime_min, sc(duration(), "1s")} + , {lifetime_max, sc(duration(), "86400s")} + , {qmode_time_window, sc(integer(), 22)} + , {auto_observe, sc(boolean(), false)} , {update_msg_publish_condition, sc(hoconsc:union([always, contains_object_list]))} , {translators, sc(ref(translators))} , {listeners, sc(ref(udp_listeners))} @@ -154,8 +154,8 @@ fields(udp_tcp_listeners) -> ]; fields(tcp_listener) -> - [ - %% some special confs for tcp listener + [ %% some special confs for tcp listener + {acceptors, sc(integer(), 16)} ] ++ tcp_opts() ++ proxy_protocol_opts() ++ @@ -175,6 +175,8 @@ fields(udp_listener) -> common_listener_opts(); fields(dtls_listener) -> + [ {acceptors, sc(integer(), 16)} + ] ++ fields(udp_listener) ++ [{dtls, sc_meta(ref(dtls_opts), #{desc => "DTLS listener options"})}]; @@ -195,25 +197,25 @@ fields(dtls_opts) -> , ciphers => dtls }, false). -% authentication() -> -% hoconsc:union( -% [ undefined -% , hoconsc:ref(emqx_authn_mnesia, config) -% , hoconsc:ref(emqx_authn_mysql, config) -% , hoconsc:ref(emqx_authn_pgsql, config) -% , hoconsc:ref(emqx_authn_mongodb, standalone) -% , hoconsc:ref(emqx_authn_mongodb, 'replica-set') -% , hoconsc:ref(emqx_authn_mongodb, 'sharded-cluster') -% , hoconsc:ref(emqx_authn_redis, standalone) -% , hoconsc:ref(emqx_authn_redis, cluster) -% , hoconsc:ref(emqx_authn_redis, sentinel) -% , hoconsc:ref(emqx_authn_http, get) -% , hoconsc:ref(emqx_authn_http, post) -% , hoconsc:ref(emqx_authn_jwt, 'hmac-based') -% , hoconsc:ref(emqx_authn_jwt, 'public-key') -% , hoconsc:ref(emqx_authn_jwt, 'jwks') -% , hoconsc:ref(emqx_enhanced_authn_scram_mnesia, config) -% ]). +authentication() -> + hoconsc:union( + [ undefined + , hoconsc:ref(emqx_authn_mnesia, config) + , hoconsc:ref(emqx_authn_mysql, config) + , hoconsc:ref(emqx_authn_pgsql, config) + , hoconsc:ref(emqx_authn_mongodb, standalone) + , hoconsc:ref(emqx_authn_mongodb, 'replica-set') + , hoconsc:ref(emqx_authn_mongodb, 'sharded-cluster') + , hoconsc:ref(emqx_authn_redis, standalone) + , hoconsc:ref(emqx_authn_redis, cluster) + , hoconsc:ref(emqx_authn_redis, sentinel) + , hoconsc:ref(emqx_authn_http, get) + , hoconsc:ref(emqx_authn_http, post) + , hoconsc:ref(emqx_authn_jwt, 'hmac-based') + , hoconsc:ref(emqx_authn_jwt, 'public-key') + , hoconsc:ref(emqx_authn_jwt, 'jwks') + , hoconsc:ref(emqx_enhanced_authn_scram_mnesia, config) + ]). gateway_common_options() -> [ {enable, sc(boolean(), true)} @@ -221,16 +223,15 @@ gateway_common_options() -> , {idle_timeout, sc(duration(), <<"30s">>)} , {mountpoint, sc(binary(), <<>>)} , {clientinfo_override, sc(ref(clientinfo_override))} - , {authentication, sc(hoconsc:lazy(map()))} + , {authentication, authentication()} ]. common_listener_opts() -> [ {enable, sc(boolean(), true)} , {bind, sc(union(ip_port(), integer()))} - , {acceptors, sc(integer(), 16)} , {max_connections, sc(integer(), 1024)} , {max_conn_rate, sc(integer())} - %, {rate_limit, sc(comma_separated_list())} + , {authentication, authentication()} , {mountpoint, sc(binary(), undefined)} , {access_rules, sc(hoconsc:array(string()), [])} ]. @@ -242,8 +243,8 @@ udp_opts() -> [{udp, sc_meta(ref(udp_opts), #{})}]. proxy_protocol_opts() -> - [ {proxy_protocol, sc(boolean())} - , {proxy_protocol_timeout, sc(duration())} + [ {proxy_protocol, sc(boolean(), false)} + , {proxy_protocol_timeout, sc(duration(), "15s")} ]. sc(Type) -> diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl index e55e0a580..9a1ca029d 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl @@ -109,10 +109,15 @@ init(ConnInfo = #{peername := {PeerHost, _}, sockname := {_, SockPort}}, Option) -> Peercert = maps:get(peercert, ConnInfo, undefined), Mountpoint = maps:get(mountpoint, Option, undefined), + ListenerId = case maps:get(listener, Option, undefined) of + undefined -> undefined; + {GwName, Type, LisName} -> + emqx_gateway_utils:listener_id(GwName, Type, LisName) + end, ClientInfo = setting_peercert_infos( Peercert, #{ zone => default - , listener => {tcp, default} + , listener => ListenerId , protocol => stomp , peerhost => PeerHost , sockport => SockPort diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl index 9599ef6e3..a93240207 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl @@ -106,6 +106,7 @@ start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) -> Name = emqx_gateway_utils:listener_id(GwName, Type, LisName), NCfg = Cfg#{ ctx => Ctx, + listener => {GwName, Type, LisName}, %% Used for authn frame_mod => emqx_stomp_frame, chann_mod => emqx_stomp_channel },