diff --git a/README.md b/README.md index 35ed465e8..a9c6e710f 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,7 @@ git clone https://github.com/emqx/emqx-rel.git cd emqx-rel && make -cd _rel/emqx && ./bin/emqx console +cd _build/emqx/rel/emqx && ./bin/emqx console ``` diff --git a/etc/emqx.conf b/etc/emqx.conf index 9ce950aae..df3be2a9a 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -145,7 +145,7 @@ cluster.autoclean = 5m ## The address type is used to extract host from k8s service. ## -## Value: ip | dns +## Value: ip | dns | hostname ## cluster.k8s.address_type = ip ## The app name helps build 'node.name'. @@ -153,6 +153,11 @@ cluster.autoclean = 5m ## Value: String ## cluster.k8s.app_name = emqx +## The suffix added to dns and hostname get from k8s service +## +## Value: String +## cluster.k8s.suffix = pod.cluster.local + ## Kubernates Namespace ## ## Value: String @@ -1878,6 +1883,14 @@ plugins.expand_plugins_dir = {{ platform_plugins_dir }}/ ## Default: 1m, 1 minute broker.sys_interval = 1m +## System heartbeat interval of publishing following heart beat message: +## - "$SYS/brokers//uptime" +## - "$SYS/brokers//datetime" +## +## Value: Duration +## Default: 30s +broker.sys_heartbeat = 30s + ## Enable global session registry. ## ## Value: on | off diff --git a/priv/emqx.schema b/priv/emqx.schema index 1366665ca..1d308476d 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -129,7 +129,7 @@ ]}. {mapping, "cluster.k8s.address_type", "ekka.cluster_discovery", [ - {datatype, {enum, [ip, dns]}} + {datatype, {enum, [ip, dns, hostname]}} ]}. {mapping, "cluster.k8s.app_name", "ekka.cluster_discovery", [ @@ -140,6 +140,11 @@ {datatype, string} ]}. +{mapping, "cluster.k8s.suffix", "ekka.cluster_discovery", [ + {datatype, string}, + {default, ""} + ]}. + {translation, "ekka.cluster_discovery", fun(Conf) -> Strategy = cuttlefish:conf_get("cluster.discovery", Conf), Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end, @@ -176,7 +181,8 @@ {service_name, cuttlefish:conf_get("cluster.k8s.service_name", Conf)}, {address_type, cuttlefish:conf_get("cluster.k8s.address_type", Conf, ip)}, {app_name, cuttlefish:conf_get("cluster.k8s.app_name", Conf)}, - {namespace, cuttlefish:conf_get("cluster.k8s.namespace", Conf)}]; + {namespace, cuttlefish:conf_get("cluster.k8s.namespace", Conf)}, + {suffix, cuttlefish:conf_get("cluster.k8s.suffix", Conf, "")}]; (manual) -> [ ] end, @@ -1807,6 +1813,11 @@ end}. {default, "1m"} ]}. +{mapping, "broker.sys_heartbeat", "emqx.broker_sys_heartbeat", [ + {datatype, {duration, ms}}, + {default, "30s"} +]}. + {mapping, "broker.enable_session_registry", "emqx.enable_session_registry", [ {default, on}, {datatype, flag} diff --git a/rebar.config b/rebar.config index 89b9436af..db7590567 100644 --- a/rebar.config +++ b/rebar.config @@ -4,7 +4,7 @@ {gproc, "0.8.0"}, % hex {replayq, "0.1.1"}, %hex {esockd, "5.5.0"}, %hex - {ekka, {git, "https://github.com/emqx/ekka", {tag, "v0.5.7"}}}, + {ekka, {git, "https://github.com/emqx/ekka", {tag, "v0.5.8"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.3.1"}}}, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} ]}. diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 65d34d740..5aaa62b30 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -148,6 +148,7 @@ call(CPid, Req) -> %%-------------------------------------------------------------------- init({Transport, RawSocket, Options}) -> + process_flag(trap_exit, true), {ok, Socket} = Transport:wait(RawSocket), {ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]), {ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]), @@ -365,6 +366,16 @@ handle(info, {shutdown, conflict, {ClientId, NewPid}}, State) -> handle(info, {shutdown, Reason}, State) -> shutdown(Reason, State); +handle(info, Info = {'EXIT', SessionPid, Reason}, State = #state{proto_state = ProtoState}) -> + case emqx_protocol:session(ProtoState) of + undefined -> + ?LOG(error, "Unexpected EXIT: ~p", [Info]), + {keep_state, State}; + SessionPid -> + ?LOG(error, "Session ~p termiated: ~p", [SessionPid, Reason]), + shutdown(Reason, State) + end; + handle(info, Info, State) -> ?LOG(error, "Unexpected info: ~p", [Info]), {keep_state, State}. @@ -499,6 +510,8 @@ maybe_gc(_, State) -> State. reply(From, Reply, State) -> {keep_state, State, [{reply, From, Reply}]}. +shutdown(Reason = {shutdown, _}, State) -> + stop(Reason, State); shutdown(Reason, State) -> stop({shutdown, Reason}, State). diff --git a/src/emqx_rpc.erl b/src/emqx_rpc.erl index 4d01b6229..96adf6605 100644 --- a/src/emqx_rpc.erl +++ b/src/emqx_rpc.erl @@ -23,18 +23,28 @@ -define(RPC, gen_rpc). call(Node, Mod, Fun, Args) -> - filter_result(?RPC:call(Node, Mod, Fun, Args)). + filter_result(?RPC:call(rpc_node(Node), Mod, Fun, Args)). multicall(Nodes, Mod, Fun, Args) -> - filter_result(?RPC:multicall(Nodes, Mod, Fun, Args)). + filter_result(?RPC:multicall(rpc_nodes(Nodes), Mod, Fun, Args)). cast(Node, Mod, Fun, Args) -> - filter_result(?RPC:cast(Node, Mod, Fun, Args)). + filter_result(?RPC:cast(rpc_node(Node), Mod, Fun, Args)). +rpc_node(Node) -> + {Node, erlang:system_info(scheduler_id)}. + +rpc_nodes(Nodes) -> + rpc_nodes(Nodes, []). + +rpc_nodes([], Acc) -> + Acc; +rpc_nodes([Node | Nodes], Acc) -> + rpc_nodes(Nodes, [rpc_node(Node) | Acc]). + + +filter_result({Error, Reason}) + when Error =:= badrpc; Error =:= badtcp -> + {badrpc, Reason}; filter_result(Delivery) -> - case Delivery of - {badrpc, Reason} -> {badrpc, Reason}; - {badtcp, Reason} -> {badrpc, Reason}; - _ -> Delivery - end. - + Delivery. diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 392ce77f5..7d23c9e62 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -626,7 +626,7 @@ handle_info({'EXIT', ConnPid, Reason}, State = #state{will_msg = WillMsg, expiry _ -> send_willmsg(WillMsg) end, - {stop, Reason, State#state{will_msg = undefined, conn_pid = undefined}}; + shutdown(Reason, State#state{will_msg = undefined, conn_pid = undefined}); handle_info({'EXIT', ConnPid, Reason}, State = #state{conn_pid = ConnPid}) -> State1 = case Reason of @@ -652,23 +652,13 @@ handle_info(Info, State) -> terminate(Reason, #state{will_msg = WillMsg, client_id = ClientId, - username = Username, - conn_pid = ConnPid, - old_conn_pid = OldConnPid}) -> + username = Username}) -> send_willmsg(WillMsg), - [maybe_shutdown(Pid, Reason) || Pid <- [ConnPid, OldConnPid]], ok = emqx_hooks:run('session.terminated', [#{client_id => ClientId, username => Username}, Reason]). code_change(_OldVsn, State, _Extra) -> {ok, State}. -maybe_shutdown(undefined, _Reason) -> - ok; -maybe_shutdown(Pid, normal) -> - Pid ! {shutdown, normal}; -maybe_shutdown(Pid, Reason) -> - exit(Pid, Reason). - %%------------------------------------------------------------------------------ %% Internal functions %%------------------------------------------------------------------------------ diff --git a/src/emqx_sys.erl b/src/emqx_sys.erl index 37e22856f..51d0d8cb9 100644 --- a/src/emqx_sys.erl +++ b/src/emqx_sys.erl @@ -28,6 +28,7 @@ , datetime/0 , sysdescr/0 , sys_interval/0 + , sys_heatbeat_interval/0 ]). -export([info/0]). @@ -92,6 +93,11 @@ datetime() -> sys_interval() -> application:get_env(?APP, broker_sys_interval, 60000). +%% @doc Get sys heatbeat interval +-spec(sys_heatbeat_interval() -> pos_integer()). +sys_heatbeat_interval() -> + application:get_env(?APP, sys_heartbeat, 30000). + %% @doc Get sys info -spec(info() -> list(tuple())). info() -> @@ -111,7 +117,7 @@ init([]) -> {ok, heartbeat(tick(State))}. heartbeat(State) -> - State#state{heartbeat = start_timer(timer:seconds(1), heartbeat)}. + State#state{heartbeat = start_timer(sys_heatbeat_interval(), heartbeat)}. tick(State) -> State#state{ticker = start_timer(sys_interval(), tick)}. diff --git a/src/emqx_ws_channel.erl b/src/emqx_ws_channel.erl index 9571baab8..23a4d8d4c 100644 --- a/src/emqx_ws_channel.erl +++ b/src/emqx_ws_channel.erl @@ -299,30 +299,49 @@ websocket_info({shutdown, Reason}, State) -> websocket_info({stop, Reason}, State) -> {stop, State#state{shutdown = Reason}}; +websocket_info(Info = {'EXIT', SessionPid, Reason}, State = #state{proto_state = ProtoState}) -> + case emqx_protocol:session(ProtoState) of + undefined -> + ?LOG(error, "Unexpected EXIT: ~p", [Info]), + {ok, State}; + SessionPid -> + ?LOG(error, "Session ~p termiated: ~p", [SessionPid, Reason]), + shutdown(Reason, State) + end; + websocket_info(Info, State) -> ?LOG(error, "Unexpected info: ~p", [Info]), {ok, State}. -terminate(SockError, _Req, #state{keepalive = Keepalive, - proto_state = ProtoState, - shutdown = Shutdown}) -> - ?LOG(debug, "Terminated for ~p, sockerror: ~p", - [Shutdown, SockError]), +terminate(WsReason, _Req, #state{keepalive = Keepalive, + proto_state = ProtoState, + shutdown = Shutdown}) -> + ?LOG(debug, "Terminated for ~p, websocket reason: ~p", + [Shutdown, WsReason]), emqx_keepalive:cancel(Keepalive), case {ProtoState, Shutdown} of {undefined, _} -> ok; {_, {shutdown, Reason}} -> - emqx_protocol:terminate(Reason, ProtoState), - exit(Reason); - {_, Error} -> - emqx_protocol:terminate(Error, ProtoState), - exit({error, SockError}) + terminate_session(Reason, ProtoState); + {_, _Error} -> + ?LOG(info, "Terminate for unexpected error: ~p", [WsReason]), + terminate_session(unknown, ProtoState) end. %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- +terminate_session(Reason, ProtoState) -> + emqx_protocol:terminate(Reason, ProtoState), + case emqx_protocol:session(ProtoState) of + undefined -> + ok; + SessionPid -> + unlink(SessionPid), + SessionPid ! {'EXIT', self(), Reason} + end. + handle_incoming(Packet, SuccFun, State = #state{proto_state = ProtoState}) -> case emqx_protocol:received(Packet, ProtoState) of {ok, NProtoState} -> @@ -343,6 +362,9 @@ ensure_stats_timer(State = #state{enable_stats = true, ensure_stats_timer(State) -> State. +shutdown(Reason = {shutdown, _}, State) -> + self() ! {stop, Reason}, + {ok, State}; shutdown(Reason, State) -> %% Fix the issue#2591(https://github.com/emqx/emqx/issues/2591#issuecomment-500278696) self() ! {stop, {shutdown, Reason}},