From f80cd2d98686f78af522fd7006ea4ecd6181efaf Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 11 Aug 2018 17:57:19 +0800 Subject: [PATCH] Improve the MQTT over Websocket connection --- priv/emqx.schema | 18 +- src/emqx_access_control.erl | 14 +- src/emqx_config.erl | 10 +- src/emqx_connection.erl | 88 +++---- src/emqx_gc.erl | 4 +- src/emqx_listeners.erl | 2 +- src/emqx_mqueue.erl | 58 ++--- src/emqx_packet.erl | 2 +- src/emqx_plugins.erl | 51 ++-- src/emqx_protocol.erl | 46 +--- src/emqx_router.erl | 1 - src/emqx_session.erl | 85 +++--- src/emqx_sup.erl | 3 - src/emqx_time.erl | 2 +- src/emqx_ws.erl | 103 -------- src/emqx_ws_connection.erl | 459 ++++++++++++++++----------------- src/emqx_ws_connection_sup.erl | 44 ---- src/emqx_zone.erl | 26 +- 18 files changed, 407 insertions(+), 609 deletions(-) delete mode 100644 src/emqx_ws.erl delete mode 100644 src/emqx_ws_connection_sup.erl diff --git a/priv/emqx.schema b/priv/emqx.schema index 44b5e4bc1..c502a8d24 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1372,21 +1372,21 @@ end}. ]}. {translation, "emqx.zones", fun(Conf) -> - Mapping = fun(retain_available, Val) -> + Mapping = fun("retain_available", Val) -> {mqtt_retain_available, Val}; - (wildcard_subscription, Val) -> + ("wildcard_subscription", Val) -> {mqtt_wildcard_subscription, Val}; - (shared_subscription, Val) -> + ("shared_subscription", Val) -> {mqtt_shared_subscription, Val}; - (Opt, Val) -> {Opt, Val} + (Opt, Val) -> + {list_to_atom(Opt), Val} end, maps:to_list( lists:foldl( fun({["zone", Name, Opt], Val}, Zones) -> maps:update_with(list_to_atom(Name), - fun(Opts) -> - [Mapping(list_to_atom(Opt), Val)|Opts] - end, [], Zones) + fun(Opts) -> [Mapping(Opt, Val)|Opts] end, + [Mapping(Opt, Val)], Zones) end, #{}, lists:usort(cuttlefish_variable:filter_by_prefix("zone.", Conf)))) end}. @@ -1507,9 +1507,11 @@ end}. maps:update_with(list_to_atom(Name), fun(Opts) -> Merge(list_to_atom(Opt), Val, Opts) - end, [{subscriptions, Subscriptions(Name)}], Acc); + end, [{list_to_atom(Opt), Val}, + {subscriptions, Subscriptions(Name)}], Acc); (_, Acc) -> Acc end, #{}, lists:usort(cuttlefish_variable:filter_by_prefix("bridge.", Conf)))) + end}. %%-------------------------------------------------------------------- diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index 75e49fe07..2b7630f1e 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -22,6 +22,8 @@ -export([start_link/0, auth/2, check_acl/3, reload_acl/0, lookup_mods/1, register_mod/3, register_mod/4, unregister_mod/2, stop/0]). +-export([clean_acl_cache/1, clean_acl_cache/2]). + %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -50,9 +52,9 @@ start_link() -> register_default_mod() -> case emqx_config:get_env(acl_file) of - {ok, File} -> - emqx_access_control:register_mod(acl, emqx_acl_internal, [File]); - undefined -> ok + undefined -> ok; + File -> + emqx_access_control:register_mod(acl, emqx_acl_internal, [File]) end. %% @doc Authenticate Client. @@ -127,6 +129,12 @@ tab_key(acl) -> acl_modules. stop() -> gen_server:stop(?MODULE, normal, infinity). +%%TODO: Support ACL cache... +clean_acl_cache(_ClientId) -> + ok. +clean_acl_cache(_ClientId, _Topic) -> + ok. + %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- diff --git a/src/emqx_config.erl b/src/emqx_config.erl index d1456f644..2b96f88fc 100644 --- a/src/emqx_config.erl +++ b/src/emqx_config.erl @@ -33,15 +33,15 @@ -define(APP, emqx). +%% @doc Get environment +-spec(get_env(Key :: atom()) -> term() | undefined). +get_env(Key) -> + get_env(Key, undefined). + -spec(get_env(Key :: atom(), Default :: term()) -> term()). get_env(Key, Default) -> application:get_env(?APP, Key, Default). -%% @doc Get environment --spec(get_env(Key :: atom()) -> {ok, any()} | undefined). -get_env(Key) -> - application:get_env(?APP, Key). - %% TODO: populate(_App) -> ok. diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 89233fb43..38c100297 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -21,9 +21,8 @@ -include("emqx_misc.hrl"). -export([start_link/3]). - -export([info/1, stats/1, kick/1]). --export([get_session/1]). +-export([session/1]). -export([clean_acl_cache/1]). -export([get_rate_limit/1, set_rate_limit/2]). -export([get_pub_limit/1, set_pub_limit/2]). @@ -44,7 +43,7 @@ rate_limit, %% Traffic rate limit limit_timer, %% Rate limit timer proto_state, %% MQTT protocol state - parse_state, %% MQTT parse state + parser_state, %% MQTT parser state keepalive, %% MQTT keepalive timer enable_stats, %% Enable stats stats_timer, %% Stats timer @@ -75,7 +74,7 @@ stats(CPid) -> kick(CPid) -> gen_server:call(CPid, kick). -get_session(CPid) -> +session(CPid) -> gen_server:call(CPid, session, infinity). clean_acl_cache(CPid) -> @@ -100,22 +99,20 @@ set_pub_limit(CPid, Rl = {_Rate, _Burst}) -> init([Transport, RawSocket, Options]) -> case Transport:wait(RawSocket) of {ok, Socket} -> - io:format("Options: ~p~n", [Options]), + Zone = proplists:get_value(zone, Options), {ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]), {ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]), Peercert = Transport:ensure_ok_or_exit(peercert, [Socket]), - Zone = proplists:get_value(zone, Options), - RateLimit = init_rate_limit(proplists:get_value(rate_limit, Options)), - PubLimit = init_rate_limit(emqx_zone:get_env(Zone, publish_limit)), - EnableStats = emqx_zone:get_env(Zone, enable_stats, false), - IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000), + PubLimit = rate_limit(emqx_zone:env(Zone, publish_limit)), + RateLimit = rate_limit(proplists:get_value(rate_limit, Options)), + EnableStats = emqx_zone:env(Zone, enable_stats, true), + IdleTimout = emqx_zone:env(Zone, idle_timeout, 30000), SendFun = send_fun(Transport, Socket, Peername), - ProtoState = emqx_protocol:init(#{zone => Zone, - peername => Peername, + ProtoState = emqx_protocol:init(#{peername => Peername, sockname => Sockname, peercert => Peercert, sendfun => SendFun}, Options), - ParseState = emqx_protocol:parser(ProtoState), + ParserState = emqx_protocol:parser(ProtoState), State = run_socket(#state{transport = Transport, socket = Socket, peername = Peername, @@ -124,7 +121,7 @@ init([Transport, RawSocket, Options]) -> rate_limit = RateLimit, pub_limit = PubLimit, proto_state = ProtoState, - parse_state = ParseState, + parser_state = ParserState, enable_stats = EnableStats, idle_timeout = IdleTimout}), gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], @@ -133,9 +130,9 @@ init([Transport, RawSocket, Options]) -> {stop, Reason} end. -init_rate_limit(undefined) -> +rate_limit(undefined) -> undefined; -init_rate_limit({Rate, Burst}) -> +rate_limit({Rate, Burst}) -> esockd_rate_limit:new(Rate, Burst). send_fun(Transport, Socket, Peername) -> @@ -152,8 +149,7 @@ send_fun(Transport, Socket, Peername) -> handle_call(info, From, State = #state{transport = Transport, socket = Socket, proto_state = ProtoState}) -> ProtoInfo = emqx_protocol:info(ProtoState), - ConnInfo = [{socktype, Transport:type(Socket)} - | ?record_to_proplist(state, State, ?INFO_KEYS)], + ConnInfo = [{socktype, Transport:type(Socket)} | ?record_to_proplist(state, State, ?INFO_KEYS)], StatsInfo = element(2, handle_call(stats, From, State)), {reply, lists:append([ConnInfo, StatsInfo, ProtoInfo]), State}; @@ -169,7 +165,7 @@ handle_call(stats, _From, State = #state{transport = Transport, socket = Sock, p handle_call(kick, _From, State) -> {stop, {shutdown, kick}, ok, State}; -handle_call(get_session, _From, State = #state{proto_state = ProtoState}) -> +handle_call(session, _From, State = #state{proto_state = ProtoState}) -> {reply, emqx_protocol:session(ProtoState), State}; handle_call(clean_acl_cache, _From, State = #state{proto_state = ProtoState}) -> @@ -195,28 +191,20 @@ handle_cast(Msg, State) -> ?LOG(error, "unexpected cast: ~p", [Msg], State), {noreply, State}. -handle_info(Sub = {subscribe, _TopicTable}, State) -> - with_proto( - fun(ProtoState) -> - emqx_protocol:process(Sub, ProtoState) - end, State); - -handle_info(Unsub = {unsubscribe, _Topics}, State) -> - with_proto( - fun(ProtoState) -> - emqx_protocol:process(Unsub, ProtoState) - end, State); - -handle_info({deliver, PubOrAck}, State) -> - with_proto( - fun(ProtoState) -> - emqx_protocol:deliver(PubOrAck, ProtoState) - end, maybe_gc(ensure_stats_timer(State))); +handle_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) -> + case emqx_protocol:deliver(PubOrAck, ProtoState) of + {ok, ProtoState1} -> + {noreply, maybe_gc(ensure_stats_timer(State#state{proto_state = ProtoState1}))}; + {error, Reason} -> + shutdown(Reason, State); + {error, Reason, ProtoState1} -> + shutdown(Reason, State#state{proto_state = ProtoState1}) + end; handle_info(emit_stats, State = #state{proto_state = ProtoState}) -> Stats = element(2, handle_call(stats, undefined, State)), emqx_cm:set_client_stats(emqx_protocol:clientid(ProtoState), Stats), - {noreply, State = #state{stats_timer = undefined}, hibernate}; + {noreply, State#state{stats_timer = undefined}, hibernate}; handle_info(timeout, State) -> shutdown(idle_timeout, State); @@ -306,20 +294,20 @@ handle_packet(<<>>, State) -> {noreply, maybe_gc(ensure_stats_timer(ensure_rate_limit(State)))}; handle_packet(Bytes, State = #state{incoming = Incoming, - parse_state = ParseState, + parser_state = ParserState, proto_state = ProtoState, idle_timeout = IdleTimeout}) -> - case catch emqx_frame:parse(Bytes, ParseState) of - {more, NewParseState} -> - {noreply, State#state{parse_state = NewParseState}, IdleTimeout}; + case catch emqx_frame:parse(Bytes, ParserState) of + {more, NewParserState} -> + {noreply, State#state{parser_state = NewParserState}, IdleTimeout}; {ok, Packet = ?PACKET(Type), Rest} -> emqx_metrics:received(Packet), case emqx_protocol:received(Packet, ProtoState) of {ok, ProtoState1} -> - ParseState1 = emqx_protocol:parser(ProtoState1), - handle_packet(Rest, State#state{incoming = count_packets(Type, Incoming), - proto_state = ProtoState1, - parse_state = ParseState1}); + ParserState1 = emqx_protocol:parser(ProtoState1), + handle_packet(Rest, State#state{incoming = count_packets(Type, Incoming), + proto_state = ProtoState1, + parser_state = ParserState1}); {error, Error} -> ?LOG(error, "Protocol error - ~p", [Error], State), shutdown(Error, State); @@ -368,16 +356,6 @@ run_socket(State = #state{transport = Transport, socket = Sock}) -> Transport:async_recv(Sock, 0, infinity), State#state{await_recv = true}. -with_proto(Fun, State = #state{proto_state = ProtoState}) -> - case Fun(ProtoState) of - {ok, ProtoState1} -> - {noreply, State#state{proto_state = ProtoState1}}; - {error, Reason} -> - shutdown(Reason, State); - {error, Reason, ProtoState1} -> - shutdown(Reason, State#state{proto_state = ProtoState1}) - end. - ensure_stats_timer(State = #state{enable_stats = true, stats_timer = undefined, idle_timeout = IdleTimeout}) -> diff --git a/src/emqx_gc.erl b/src/emqx_gc.erl index 2cc0b2a1a..6b1d43207 100644 --- a/src/emqx_gc.erl +++ b/src/emqx_gc.erl @@ -27,8 +27,8 @@ -spec(conn_max_gc_count() -> integer()). conn_max_gc_count() -> case emqx_config:get_env(conn_force_gc_count) of - {ok, I} when I > 0 -> I + rand:uniform(I); - {ok, I} when I =< 0 -> undefined; + I when is_integer(I), I > 0 -> I + rand:uniform(I); + I when is_integer(I), I =< 0 -> undefined; undefined -> undefined end. diff --git a/src/emqx_listeners.erl b/src/emqx_listeners.erl index 78fd5db80..084ffe7c2 100644 --- a/src/emqx_listeners.erl +++ b/src/emqx_listeners.erl @@ -38,7 +38,7 @@ start_listener({Proto, ListenOn, Options}) when Proto == ssl; Proto == tls -> %% Start MQTT/WS listener start_listener({Proto, ListenOn, Options}) when Proto == http; Proto == ws -> - Dispatch = cowboy_router:compile([{'_', [{"/mqtt", emqx_ws, []}]}]), + Dispatch = cowboy_router:compile([{'_', [{"/mqtt", emqx_ws_connection, Options}]}]), NumAcceptors = proplists:get_value(acceptors, Options, 4), MaxConnections = proplists:get_value(max_connections, Options, 1024), TcpOptions = proplists:get_value(tcp_options, Options, []), diff --git a/src/emqx_mqueue.erl b/src/emqx_mqueue.erl index 31811583f..458c301fc 100644 --- a/src/emqx_mqueue.erl +++ b/src/emqx_mqueue.erl @@ -39,32 +39,25 @@ %% %% @end +%% TODO: ... -module(emqx_mqueue). -%% TODO: XYZ -include("emqx.hrl"). - -include("emqx_mqtt.hrl"). -import(proplists, [get_value/3]). --export([new/2, type/1, name/1, is_empty/1, len/1, max_len/1, in/2, out/1, - dropped/1, stats/1]). - --define(LOW_WM, 0.2). - --define(HIGH_WM, 0.6). +-export([new/2, type/1, name/1, is_empty/1, len/1, max_len/1, in/2, out/1]). +-export([dropped/1, stats/1]). -define(PQUEUE, emqx_pqueue). -type(priority() :: {iolist(), pos_integer()}). --type(option() :: {type, simple | priority} - | {max_length, non_neg_integer()} %% Max queue length - | {priority, list(priority())} - | {low_watermark, float()} %% Low watermark - | {high_watermark, float()} %% High watermark - | {store_qos0, boolean()}). %% Queue Qos0? +-type(options() :: #{type => simple | priority, + max_len => non_neg_integer(), + priority => list(priority()), + store_qos0 => boolean()}). -type(stat() :: {max_len, non_neg_integer()} | {len, non_neg_integer()} @@ -76,29 +69,22 @@ pseq = 0, priorities = [], %% len of simple queue len = 0, max_len = 0, - low_wm = ?LOW_WM, high_wm = ?HIGH_WM, qos0 = false, dropped = 0}). -type(mqueue() :: #mqueue{}). --export_type([mqueue/0, priority/0, option/0]). +-export_type([mqueue/0, priority/0, options/0]). -%% @doc New queue. --spec(new(iolist(), list(option())) -> mqueue()). -new(Name, Opts) -> - Type = get_value(type, Opts, simple), - MaxLen = get_value(max_length, Opts, 0), +-spec(new(iolist(), options()) -> mqueue()). +new(Name, #{type := Type, max_len := MaxLen, store_qos0 := StoreQos0}) -> init_q(#mqueue{type = Type, name = iolist_to_binary(Name), - len = 0, max_len = MaxLen, - low_wm = low_wm(MaxLen, Opts), - high_wm = high_wm(MaxLen, Opts), - qos0 = get_value(store_qos0, Opts, false)}, Opts). + len = 0, max_len = MaxLen, qos0 = StoreQos0}). -init_q(MQ = #mqueue{type = simple}, _Opts) -> +init_q(MQ = #mqueue{type = simple}) -> MQ#mqueue{q = queue:new()}; -init_q(MQ = #mqueue{type = priority}, Opts) -> - Priorities = get_value(priority, Opts, []), - init_p(Priorities, MQ#mqueue{q = ?PQUEUE:new()}). +init_q(MQ = #mqueue{type = priority}) -> + %%Priorities = get_value(priority, Opts, []), + init_p([], MQ#mqueue{q = ?PQUEUE:new()}). init_p([], MQ) -> MQ; @@ -110,16 +96,6 @@ insert_p(Topic, P, MQ = #mqueue{priorities = Tab, pseq = Seq}) -> <> = <>, {PInt, MQ#mqueue{priorities = [{Topic, PInt} | Tab], pseq = Seq + 1}}. -low_wm(0, _Opts) -> - undefined; -low_wm(MaxLen, Opts) -> - round(MaxLen * get_value(low_watermark, Opts, ?LOW_WM)). - -high_wm(0, _Opts) -> - undefined; -high_wm(MaxLen, Opts) -> - round(MaxLen * get_value(high_watermark, Opts, ?HIGH_WM)). - -spec(name(mqueue()) -> iolist()). name(#mqueue{name = Name}) -> Name. @@ -172,8 +148,8 @@ in(Msg = #message{topic = Topic}, MQ = #mqueue{type = priority, q = Q, MQ1#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)} end; in(Msg = #message{topic = Topic}, MQ = #mqueue{type = priority, q = Q, - priorities = Priorities, - max_len = MaxLen}) -> + priorities = Priorities, + max_len = MaxLen}) -> case lists:keysearch(Topic, 1, Priorities) of {value, {_, Pri}} -> case ?PQUEUE:plen(Pri, Q) >= MaxLen of diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index 8baa6f088..65f125f68 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -114,7 +114,7 @@ format_variable(#mqtt_packet_connect{ format_variable(#mqtt_packet_connack{ack_flags = AckFlags, reason_code = ReasonCode}) -> - io_lib:format("AckFlags=~p, RetainCode=~p", [AckFlags, ReasonCode]); + io_lib:format("AckFlags=~p, ReasonCode=~p", [AckFlags, ReasonCode]); format_variable(#mqtt_packet_publish{topic_name = TopicName, packet_id = PacketId}) -> diff --git a/src/emqx_plugins.erl b/src/emqx_plugins.erl index 837ed8a0e..0c03e827e 100644 --- a/src/emqx_plugins.erl +++ b/src/emqx_plugins.erl @@ -32,12 +32,11 @@ -spec(init() -> ok). init() -> case emqx_config:get_env(plugins_etc_dir) of - {ok, PluginsEtc} -> + undefined -> ok; + PluginsEtc -> CfgFiles = [filename:join(PluginsEtc, File) || - File <- filelib:wildcard("*.config", PluginsEtc)], - lists:foreach(fun init_config/1, CfgFiles); - undefined -> - ok + File <- filelib:wildcard("*.config", PluginsEtc)], + lists:foreach(fun init_config/1, CfgFiles) end. init_config(CfgFile) -> @@ -51,25 +50,24 @@ init_config(CfgFile) -> load() -> load_expand_plugins(), case emqx_config:get_env(plugins_loaded_file) of - {ok, File} -> + undefined -> %% No plugins available + ignore; + File -> ensure_file(File), - with_loaded_file(File, fun(Names) -> load_plugins(Names, false) end); - undefined -> - %% No plugins available - ignore + with_loaded_file(File, fun(Names) -> load_plugins(Names, false) end) end. load_expand_plugins() -> case emqx_config:get_env(expand_plugins_dir) of - {ok, Dir} -> + undefined -> ok; + Dir -> PluginsDir = filelib:wildcard("*", Dir), lists:foreach(fun(PluginDir) -> case filelib:is_dir(Dir ++ PluginDir) of true -> load_expand_plugin(Dir ++ PluginDir); false -> ok end - end, PluginsDir); - _ -> ok + end, PluginsDir) end. load_expand_plugin(PluginDir) -> @@ -102,7 +100,8 @@ init_expand_plugin_config(PluginDir) -> get_expand_plugin_config() -> case emqx_config:get_env(expand_plugins_dir) of - {ok, Dir} -> + undefined -> ok; + Dir -> PluginsDir = filelib:wildcard("*", Dir), lists:foldl(fun(PluginDir, Acc) -> case filelib:is_dir(Dir ++ PluginDir) of @@ -115,11 +114,9 @@ get_expand_plugin_config() -> false -> Acc end - end, [], PluginsDir); - _ -> ok + end, [], PluginsDir) end. - ensure_file(File) -> case filelib:is_file(File) of false -> write_loaded([]); true -> ok end. @@ -145,10 +142,10 @@ load_plugins(Names, Persistent) -> -spec(unload() -> list() | {error, term()}). unload() -> case emqx_config:get_env(plugins_loaded_file) of - {ok, File} -> - with_loaded_file(File, fun stop_plugins/1); undefined -> - ignore + ignore; + File -> + with_loaded_file(File, fun stop_plugins/1) end. %% stop plugins @@ -159,7 +156,9 @@ stop_plugins(Names) -> -spec(list() -> [plugin()]). list() -> case emqx_config:get_env(plugins_etc_dir) of - {ok, PluginsEtc} -> + undefined -> + []; + PluginsEtc -> CfgFiles = filelib:wildcard("*.{conf,config}", PluginsEtc) ++ get_expand_plugin_config(), Plugins = [plugin(CfgFile) || CfgFile <- CfgFiles], StartedApps = names(started_app), @@ -168,9 +167,7 @@ list() -> true -> Plugin#plugin{active = true}; false -> Plugin end - end, Plugins); - undefined -> - [] + end, Plugins) end. plugin(CfgFile) -> @@ -314,14 +311,14 @@ plugin_unloaded(Name, true) -> read_loaded() -> case emqx_config:get_env(plugins_loaded_file) of - {ok, File} -> read_loaded(File); - undefined -> {error, not_found} + undefined -> {error, not_found}; + File -> read_loaded(File) end. read_loaded(File) -> file:consult(File). write_loaded(AppNames) -> - {ok, File} = emqx_config:get_env(plugins_loaded_file), + File = emqx_config:get_env(plugins_loaded_file), case file:open(File, [binary, write]) of {ok, Fd} -> lists:foreach(fun(Name) -> diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 47a2c16e4..705674000 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -36,7 +36,7 @@ {shared_subscription, true}, {wildcard_subscription, true}]). --record(proto_state, {sockprops, capabilities, connected, client_id, client_pid, +-record(proto_state, {zone, sockprops, capabilities, connected, client_id, client_pid, clean_start, proto_ver, proto_name, username, connprops, is_superuser, will_msg, keepalive, keepalive_backoff, session, recv_pkt = 0, recv_msg = 0, send_pkt = 0, send_msg = 0, @@ -56,15 +56,17 @@ -export_type([proto_state/0]). -init(SockProps = #{zone := Zone, peercert := Peercert}, Options) -> - MountPoint = emqx_zone:get_env(Zone, mountpoint), - Backoff = emqx_zone:get_env(Zone, keepalive_backoff, 0.75), +init(SockProps = #{peercert := Peercert}, Options) -> + Zone = proplists:get_value(zone, Options), + MountPoint = emqx_zone:env(Zone, mountpoint), + Backoff = emqx_zone:env(Zone, keepalive_backoff, 0.75), Username = case proplists:get_value(peer_cert_as_username, Options) of cn -> esockd_peercert:common_name(Peercert); dn -> esockd_peercert:subject(Peercert); _ -> undefined end, - #proto_state{sockprops = SockProps, + #proto_state{zone = Zone, + sockprops = SockProps, capabilities = capabilities(Zone), connected = false, clean_start = true, @@ -82,7 +84,7 @@ init(SockProps = #{zone := Zone, peercert := Peercert}, Options) -> send_msg = 0}. capabilities(Zone) -> - Capabilities = emqx_zone:get_env(Zone, mqtt_capabilities, []), + Capabilities = emqx_zone:env(Zone, mqtt_capabilities, []), maps:from_list(lists:ukeymerge(1, ?CAPABILITIES, Capabilities)). parser(#proto_state{capabilities = #{max_packet_size := Size}, proto_ver = Ver}) -> @@ -128,7 +130,9 @@ received(Packet = ?PACKET(Type), ProtoState) -> {error, Reason, ProtoState} end. -process(?CONNECT_PACKET(Var), ProtoState = #proto_state{username = Username0, client_pid = ClientPid}) -> +process(?CONNECT_PACKET(Var), ProtoState = #proto_state{zone = Zone, + username = Username0, + client_pid = ClientPid}) -> #mqtt_packet_connect{proto_name = ProtoName, proto_ver = ProtoVer, is_bridge = IsBridge, @@ -160,7 +164,8 @@ process(?CONNECT_PACKET(Var), ProtoState = #proto_state{username = Username0, cl %% Generate clientId if null ProtoState2 = maybe_set_clientid(ProtoState1), %% Open session - case emqx_sm:open_session(#{clean_start => CleanStart, + case emqx_sm:open_session(#{zone => Zone, + clean_start => CleanStart, client_id => clientid(ProtoState2), username => Username, client_pid => ClientPid}) of @@ -242,23 +247,10 @@ process(?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), State) -> {ok, TopicFilters1} -> ok = emqx_session:subscribe(Session, {PacketId, Properties, mount(replvar(MountPoint, State), TopicFilters1)}), {ok, State}; - {stop, _} -> - {ok, State} + {stop, _} -> {ok, State} end end; -process({subscribe, RawTopicTable}, - State = #proto_state{client_id = ClientId, - username = Username, - session = Session}) -> - TopicTable = parse_topic_filters(RawTopicTable), - case emqx_hooks:run('client.subscribe', [ClientId, Username], TopicTable) of - {ok, TopicTable1} -> - emqx_session:subscribe(Session, TopicTable1); - {stop, _} -> ok - end, - {ok, State}; - %% Protect from empty topic list process(?UNSUBSCRIBE_PACKET(PacketId, []), State) -> send(?UNSUBACK_PACKET(PacketId), State); @@ -276,16 +268,6 @@ process(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopics), end, send(?UNSUBACK_PACKET(PacketId), State); -process({unsubscribe, RawTopics}, State = #proto_state{client_id = ClientId, - username = Username, - session = Session}) -> - case emqx_hooks:run('client.unsubscribe', [ClientId, Username], parse_topics(RawTopics)) of - {ok, TopicTable} -> - emqx_session:unsubscribe(Session, {undefined, #{}, TopicTable}); - {stop, _} -> ok - end, - {ok, State}; - process(?PACKET(?PINGREQ), ProtoState) -> send(?PACKET(?PINGRESP), ProtoState); diff --git a/src/emqx_router.erl b/src/emqx_router.erl index 85a6a63ad..863214617 100644 --- a/src/emqx_router.erl +++ b/src/emqx_router.erl @@ -39,7 +39,6 @@ -type(destination() :: node() | {binary(), node()}). -record(batch, {enabled, timer, pending}). - -record(state, {pool, id, batch :: #batch{}}). -define(ROUTE, emqx_route). diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 03bcae3f2..1253de5cc 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -48,7 +48,9 @@ -export([resume/2, discard/2]). -export([subscribe/2]).%%, subscribe/3]). -export([publish/3]). --export([puback/2, pubrec/2, pubrel/2, pubcomp/2]). +-export([puback/2, puback/3]). +-export([pubrec/2, pubrec/3]). +-export([pubrel/2, pubcomp/2]). -export([unsubscribe/2]). %% gen_server callbacks @@ -139,7 +141,11 @@ }). -define(TIMEOUT, 60000). + +-define(DEFAULT_SUBOPTS, #{rh => 0, rap => 0, nl => 0, qos => ?QOS_0}). + -define(INFO_KEYS, [clean_start, client_id, username, client_pid, binding, created_at]). + -define(STATE_KEYS, [clean_start, client_id, username, binding, client_pid, old_client_pid, next_pkt_id, max_subscriptions, subscriptions, upgrade_qos, inflight, max_inflight, retry_interval, mqueue, awaiting_rel, max_awaiting_rel, @@ -151,16 +157,21 @@ "Session(~s): " ++ Format, [State#state.client_id | Args])). %% @doc Start a session --spec(start_link(Attrs :: map()) -> {ok, pid()} | {error, term()}). -start_link(Attrs) -> - gen_server:start_link(?MODULE, Attrs, [{hibernate_after, 10000}]). +-spec(start_link(SessAttrs :: map()) -> {ok, pid()} | {error, term()}). +start_link(SessAttrs) -> + gen_server:start_link(?MODULE, SessAttrs, [{hibernate_after, 30000}]). %%------------------------------------------------------------------------------ %% PubSub API %%------------------------------------------------------------------------------ +-spec(subscribe(pid(), list({topic(), map()}) | + {mqtt_packet_id(), mqtt_properties(), topic_table()}) -> ok). +%% internal call +subscribe(SPid, TopicFilters) when is_list(TopicFilters) -> + %%TODO: Parse the topic filters? + subscribe(SPid, {undefined, #{}, TopicFilters}); %% for mqtt 5.0 --spec(subscribe(pid(), {mqtt_packet_id(), mqtt_properties(), topic_table()}) -> ok). subscribe(SPid, SubReq = {PacketId, Props, TopicFilters}) -> gen_server:cast(SPid, {subscribe, self(), SubReq}). @@ -200,6 +211,9 @@ pubcomp(SPid, PacketId) -> gen_server:cast(SPid, {pubcomp, PacketId}). -spec(unsubscribe(pid(), {mqtt_packet_id(), mqtt_properties(), topic_table()}) -> ok). +unsubscribe(SPid, TopicFilters) when is_list(TopicFilters) -> + %%TODO: Parse the topic filters? + unsubscribe(SPid, {undefined, #{}, TopicFilters}); unsubscribe(SPid, UnsubReq = {PacketId, Properties, TopicFilters}) -> gen_server:cast(SPid, {unsubscribe, self(), UnsubReq}). @@ -252,40 +266,43 @@ close(SPid) -> %% gen_server callbacks %%------------------------------------------------------------------------------ -init(#{clean_start := CleanStart, client_id := ClientId, username := Username, client_pid := ClientPid}) -> +init(#{zone := Zone, + client_id := ClientId, + client_pid := ClientPid, + clean_start := CleanStart, + username := Username}) -> process_flag(trap_exit, true), true = link(ClientPid), init_stats([deliver_msg, enqueue_msg]), - {ok, Env} = emqx_config:get_env(session), - {ok, QEnv} = emqx_config:get_env(mqueue), - MaxInflight = proplists:get_value(max_inflight, Env, 0), - EnableStats = proplists:get_value(enable_stats, Env, false), - IgnoreLoopDeliver = proplists:get_value(ignore_loop_deliver, Env, false), - MQueue = emqx_mqueue:new(ClientId, QEnv), + MaxInflight = emqx_zone:env(Zone, max_inflight), State = #state{clean_start = CleanStart, binding = binding(ClientPid), client_id = ClientId, client_pid = ClientPid, username = Username, subscriptions = #{}, - max_subscriptions = proplists:get_value(max_subscriptions, Env, 0), - upgrade_qos = proplists:get_value(upgrade_qos, Env, false), + max_subscriptions = emqx_zone:env(Zone, max_subscriptions, 0), + upgrade_qos = emqx_zone:env(Zone, upgrade_qos, false), max_inflight = MaxInflight, inflight = emqx_inflight:new(MaxInflight), - mqueue = MQueue, - retry_interval = proplists:get_value(retry_interval, Env), + mqueue = init_mqueue(Zone, ClientId), + retry_interval = emqx_zone:env(Zone, retry_interval, 0), awaiting_rel = #{}, - await_rel_timeout = proplists:get_value(await_rel_timeout, Env), - max_awaiting_rel = proplists:get_value(max_awaiting_rel, Env), - expiry_interval = proplists:get_value(expiry_interval, Env), - enable_stats = EnableStats, - ignore_loop_deliver = IgnoreLoopDeliver, + await_rel_timeout = emqx_zone:env(Zone, await_rel_timeout), + max_awaiting_rel = emqx_zone:env(Zone, max_awaiting_rel), + expiry_interval = emqx_zone:env(Zone, session_expiry_interval), + enable_stats = emqx_zone:env(Zone, enable_stats, true), + ignore_loop_deliver = emqx_zone:env(Zone, ignore_loop_deliver, true), created_at = os:timestamp()}, emqx_sm:register_session(ClientId, info(State)), - emqx_hooks:run('session.created', [ClientId, Username]), - io:format("Session started: ~p~n", [self()]), + emqx_hooks:run('session.created', [ClientId]), {ok, emit_stats(State), hibernate}. +init_mqueue(Zone, ClientId) -> + emqx_mqueue:new(ClientId, #{type => simple, + max_len => emqx_zone:env(Zone, max_mqueue_len), + store_qos0 => emqx_zone:env(Zone, mqueue_store_qos0)}). + init_stats(Keys) -> lists:foreach(fun(K) -> put(K, 0) end, Keys). @@ -331,7 +348,7 @@ handle_call(Req, _From, State) -> {reply, ignored, State}. handle_cast({subscribe, From, {PacketId, _Properties, TopicFilters}}, - State = #state{client_id = ClientId, username = Username, subscriptions = Subscriptions}) -> + State = #state{client_id = ClientId, subscriptions = Subscriptions}) -> ?LOG(info, "Subscribe ~p", [TopicFilters], State), {ReasonCodes, Subscriptions1} = lists:foldl(fun({Topic, SubOpts = #{qos := QoS}}, {RcAcc, SubMap}) -> @@ -342,12 +359,12 @@ handle_cast({subscribe, From, {PacketId, _Properties, TopicFilters}}, SubMap; {ok, OldOpts} -> emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts), - emqx_hooks:run('session.subscribed', [ClientId, Username], {Topic, SubOpts}), + emqx_hooks:run('session.subscribed', [ClientId, Topic, SubOpts]), ?LOG(warning, "Duplicated subscribe ~s, old_opts: ~p, new_opts: ~p", [Topic, OldOpts, SubOpts], State), maps:put(Topic, SubOpts, SubMap); error -> emqx_broker:subscribe(Topic, ClientId, SubOpts), - emqx_hooks:run('session.subscribed', [ClientId, Username], {Topic, SubOpts}), + emqx_hooks:run('session.subscribed', [ClientId, Topic, SubOpts]), maps:put(Topic, SubOpts, SubMap) end} end, {[], Subscriptions}, TopicFilters), @@ -355,14 +372,14 @@ handle_cast({subscribe, From, {PacketId, _Properties, TopicFilters}}, {noreply, emit_stats(State#state{subscriptions = Subscriptions1})}; handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}}, - State = #state{client_id = ClientId, username = Username, subscriptions = Subscriptions}) -> + State = #state{client_id = ClientId, subscriptions = Subscriptions}) -> ?LOG(info, "Unsubscribe ~p", [TopicFilters], State), {ReasonCodes, Subscriptions1} = lists:foldl(fun(Topic, {RcAcc, SubMap}) -> case maps:find(Topic, SubMap) of {ok, SubOpts} -> emqx_broker:unsubscribe(Topic, ClientId), - emqx_hooks:run('session.unsubscribed', [ClientId, Username], {Topic, SubOpts}), + emqx_hooks:run('session.unsubscribed', [ClientId, Topic, SubOpts]), {[?RC_SUCCESS|RcAcc], maps:remove(Topic, SubMap)}; error -> {[?RC_NO_SUBSCRIPTION_EXISTED|RcAcc], SubMap} @@ -473,13 +490,18 @@ handle_cast(Msg, State) -> emqx_logger:error("[Session] unexpected cast: ~p", [Msg]), {noreply, State}. -%% Ignore Messages delivered by self +handle_info({dispatch, Topic, Msgs}, State) when is_list(Msgs) -> + {noreply, lists:foldl(fun(Msg, NewState) -> + element(2, handle_info({dispatch, Topic, Msg}, NewState)) + end, State, Msgs)}; + +%% Ignore messages delivered by self handle_info({dispatch, _Topic, #message{from = ClientId}}, State = #state{client_id = ClientId, ignore_loop_deliver = true}) -> {noreply, State}; %% Dispatch Message -handle_info({dispatch, Topic, Msg}, State) -> +handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, message) -> {noreply, gc(dispatch(tune_qos(Topic, reset_dup(Msg), State), State))}; %% Do nothing if the client has been disconnected. @@ -510,11 +532,10 @@ handle_info({'EXIT', ClientPid, Reason}, {noreply, emit_stats(State1), hibernate}; handle_info({'EXIT', Pid, _Reason}, State = #state{old_client_pid = Pid}) -> - %%ignore + %% ignore {noreply, State, hibernate}; handle_info({'EXIT', Pid, Reason}, State = #state{client_pid = ClientPid}) -> - ?LOG(error, "unexpected EXIT: client_pid=~p, exit_pid=~p, reason=~p", [ClientPid, Pid, Reason], State), {noreply, State, hibernate}; diff --git a/src/emqx_sup.erl b/src/emqx_sup.erl index b3378ba91..563244232 100644 --- a/src/emqx_sup.erl +++ b/src/emqx_sup.erl @@ -71,8 +71,6 @@ init([]) -> SessionSup = supervisor_spec(emqx_session_sup), %% Connection Manager CMSup = supervisor_spec(emqx_cm_sup), - %% WebSocket Connection Sup - WSConnSup = supervisor_spec(emqx_ws_connection_sup), %% Sys Sup SysSup = supervisor_spec(emqx_sys_sup), {ok, {{one_for_all, 0, 1}, @@ -84,7 +82,6 @@ init([]) -> SMSup, SessionSup, CMSup, - WSConnSup, SysSup]}}. %%-------------------------------------------------------------------- diff --git a/src/emqx_time.erl b/src/emqx_time.erl index 2e69638dc..623c4a543 100644 --- a/src/emqx_time.erl +++ b/src/emqx_time.erl @@ -20,7 +20,7 @@ seed() -> rand:seed(exsplus, erlang:timestamp()). now_ms() -> - now_ms(os:timestamp()). + os:system_time(milli_seconds). now_ms({MegaSecs, Secs, MicroSecs}) -> (MegaSecs * 1000000 + Secs) * 1000 + round(MicroSecs/1000). diff --git a/src/emqx_ws.erl b/src/emqx_ws.erl deleted file mode 100644 index d7f6dc6e8..000000000 --- a/src/emqx_ws.erl +++ /dev/null @@ -1,103 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_ws). - --include("emqx_mqtt.hrl"). - --import(proplists, [get_value/3]). - -%% WebSocket Loop State --record(wsocket_state, {req, peername, client_pid, max_packet_size, parser}). - --define(WSLOG(Level, Format, Args, State), - lager:Level("WsClient(~s): " ++ Format, - [esockd_net:format(State#wsocket_state.peername) | Args])). - --export([init/2]). --export([websocket_init/1]). --export([websocket_handle/2]). --export([websocket_info/2]). - -init(Req0, _State) -> - case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req0) of - undefined -> - {cowboy_websocket, Req0, #wsocket_state{}}; - Subprotocols -> - case lists:member(<<"mqtt">>, Subprotocols) of - true -> - Peername = cowboy_req:peer(Req0), - Req = cowboy_req:set_resp_header(<<"sec-websocket-protocol">>, <<"mqtt">>, Req0), - {cowboy_websocket, Req, #wsocket_state{req = Req, peername = Peername}, #{idle_timeout => 86400000}}; - false -> - Req = cowboy_req:reply(400, Req0), - {ok, Req, #wsocket_state{}} - end - end. - -websocket_init(State = #wsocket_state{req = Req}) -> - case emqx_ws_connection_sup:start_connection(self(), Req) of - {ok, ClientPid} -> - {ok, ProtoEnv} = emqx_config:get_env(protocol), - PacketSize = get_value(max_packet_size, ProtoEnv, ?MAX_PACKET_SIZE), - Parser = emqx_frame:initial_state(#{max_packet_size => PacketSize}), - NewState = State#wsocket_state{parser = Parser, - max_packet_size = PacketSize, - client_pid = ClientPid}, - {ok, NewState}; - Error -> - ?WSLOG(error, "Start client fail: ~p", [Error], State), - {stop, State} - end. - -websocket_handle({binary, <<>>}, State) -> - {ok, State}; -websocket_handle({binary, [<<>>]}, State) -> - {ok, State}; - -websocket_handle({binary, Data}, State = #wsocket_state{client_pid = ClientPid, parser = Parser}) -> - ?WSLOG(debug, "RECV ~p", [Data], State), - BinSize = iolist_size(Data), - emqx_metrics:inc('bytes/received', BinSize), - case catch emqx_frame:parse(iolist_to_binary(Data), Parser) of - {more, NewParser} -> - {ok, State#wsocket_state{parser = NewParser}}; - {ok, Packet, Rest} -> - gen_server:cast(ClientPid, {received, Packet, BinSize}), - websocket_handle({binary, Rest}, reset_parser(State)); - {error, Error} -> - ?WSLOG(error, "Frame error: ~p", [Error], State), - {stop, State}; - {'EXIT', Reason} -> - ?WSLOG(error, "Frame error: ~p", [Reason], State), - ?WSLOG(error, "Error data: ~p", [Data], State), - {stop, State} - end. - -websocket_info({binary, Data}, State) -> - {reply, {binary, Data}, State}; - -websocket_info({'EXIT', Pid, Reason}, State = #wsocket_state{client_pid = Pid}) -> - ?WSLOG(debug, "EXIT: ~p", [Reason], State), - {stop, State}; - -websocket_info(_Info, State) -> - {ok, State}. - -reset_parser(State = #wsocket_state{max_packet_size = PacketSize}) -> - State#wsocket_state{parser = emqx_frame:initial_state(#{max_packet_size => PacketSize})}. - - diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 93a289636..5932b9ef7 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -14,232 +14,111 @@ -module(emqx_ws_connection). --behaviour(gen_server). - -include("emqx.hrl"). - -include("emqx_mqtt.hrl"). +-include("emqx_misc.hrl"). --import(proplists, [get_value/2, get_value/3]). - -%% API Exports --export([start_link/3]). - -%% Management and Monitor API --export([info/1, stats/1, kick/1, clean_acl_cache/2]). - -%% SUB/UNSUB Asynchronously --export([subscribe/2, unsubscribe/2]). - -%% Get the session proc? +-export([info/1]). +-export([stats/1]). +-export([kick/1]). -export([session/1]). -%% gen_server Function Exports --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). +%% websocket callbacks +-export([init/2]). +-export([websocket_init/1]). +-export([websocket_handle/2]). +-export([websocket_info/2]). +-export([terminate/3]). -%% WebSocket Client State --record(wsclient_state, {ws_pid, peername, proto_state, keepalive, - enable_stats, force_gc_count}). +-record(state, { + request, + options, + peername, + sockname, + proto_state, + parser_state, + keepalive, + enable_stats, + stats_timer, + idle_timeout, + shutdown_reason + }). -%% recv_oct -%% Number of bytes received by the socket. +-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]). -%% recv_cnt -%% Number of packets received by the socket. - --define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). +-define(INFO_KEYS, [peername, sockname]). -define(WSLOG(Level, Format, Args, State), - emqx_logger:Level("WsClient(~s): " ++ Format, - [esockd_net:format(State#wsclient_state.peername) | Args])). + lager:Level("WsClient(~s): " ++ Format, [esockd_net:format(State#state.peername) | Args])). -%% @doc Start WebSocket Client. -start_link(Env, WsPid, Req) -> - gen_server:start_link(?MODULE, [Env, WsPid, Req], - [[{hibernate_after, 10000}]]). +%%------------------------------------------------------------------------------ +%% API +%%------------------------------------------------------------------------------ -info(CPid) -> - gen_server:call(CPid, info). +info(WSPid) -> + call(WSPid, info). -stats(CPid) -> - gen_server:call(CPid, stats). +stats(WSPid) -> + call(WSPid, stats). -kick(CPid) -> - gen_server:call(CPid, kick). +kick(WSPid) -> + call(WSPid, kick). -subscribe(CPid, TopicTable) -> - CPid ! {subscribe, TopicTable}. +session(WSPid) -> + call(WSPid, session). -unsubscribe(CPid, Topics) -> - CPid ! {unsubscribe, Topics}. - -session(CPid) -> - gen_server:call(CPid, session). - -clean_acl_cache(CPid, Topic) -> - gen_server:call(CPid, {clean_acl_cache, Topic}). - -%%-------------------------------------------------------------------- -%% gen_server Callbacks -%%-------------------------------------------------------------------- - -init([Options, WsPid, Req]) -> - init_stas(), - process_flag(trap_exit, true), - true = link(WsPid), - Peername = cowboy_req:peer(Req), - Headers = cowboy_req:headers(Req), - Sockname = cowboy_req:sock(Req), - Peercert = cowboy_req:cert(Req), - Zone = proplists:get_value(zone, Options), - ProtoState = emqx_protocol:init(#{zone => Zone, - peername => Peername, - sockname => Sockname, - peercert => Peercert, - sendfun => send_fun(WsPid)}, - [{ws_initial_headers, Headers} | Options]), - IdleTimeout = get_value(client_idle_timeout, Options, 30000), - EnableStats = get_value(client_enable_stats, Options, false), - ForceGcCount = emqx_gc:conn_max_gc_count(), - {ok, #wsclient_state{ws_pid = WsPid, - peername = Peername, - proto_state = ProtoState, - enable_stats = EnableStats, - force_gc_count = ForceGcCount}, IdleTimeout}. - -handle_call(info, From, State = #wsclient_state{peername = Peername, - proto_state = ProtoState}) -> - Info = [{websocket, true}, {peername, Peername} | emqx_protocol:info(ProtoState)], - {reply, Stats, _, _} = handle_call(stats, From, State), - reply(lists:append(Info, Stats), State); - -handle_call(stats, _From, State = #wsclient_state{proto_state = ProtoState}) -> - reply(lists:append([emqx_misc:proc_stats(), - wsock_stats(), - emqx_protocol:stats(ProtoState)]), State); - -handle_call(kick, _From, State) -> - {stop, {shutdown, kick}, ok, State}; - -handle_call(session, _From, State = #wsclient_state{proto_state = ProtoState}) -> - reply(emqx_protocol:session(ProtoState), State); - -handle_call({clean_acl_cache, Topic}, _From, State) -> - erase({acl, publish, Topic}), - reply(ok, State); - -handle_call(Req, _From, State) -> - ?WSLOG(error, "Unexpected request: ~p", [Req], State), - reply({error, unexpected_request}, State). - -handle_cast({received, Packet, BinSize}, State = #wsclient_state{proto_state = ProtoState}) -> - put(recv_oct, get(recv_oct) + BinSize), - put(recv_cnt, get(recv_cnt) + 1), - emqx_metrics:received(Packet), - case emqx_protocol:received(Packet, ProtoState) of - {ok, ProtoState1} -> - {noreply, gc(State#wsclient_state{proto_state = ProtoState1}), hibernate}; - {error, Error} -> - ?WSLOG(error, "Protocol error - ~p", [Error], State), - shutdown(Error, State); - {error, Error, ProtoState1} -> - shutdown(Error, State#wsclient_state{proto_state = ProtoState1}); - {stop, Reason, ProtoState1} -> - stop(Reason, State#wsclient_state{proto_state = ProtoState1}) - end; - -handle_cast(Msg, State) -> - ?WSLOG(error, "unexpected msg: ~p", [Msg], State), - {noreply, State}. - -handle_info(SubReq ={subscribe, _TopicTable}, State) -> - with_proto( - fun(ProtoState) -> - emqx_protocol:process(SubReq, ProtoState) - end, State); - -handle_info(UnsubReq = {unsubscribe, _Topics}, State) -> - with_proto( - fun(ProtoState) -> - emqx_protocol:process(UnsubReq, ProtoState) - end, State); - -handle_info({deliver, PubOrAck}, State) -> - with_proto( - fun(ProtoState) -> - emqx_protocol:deliver(PubOrAck, ProtoState) - end, gc(State)); - -handle_info(emit_stats, State) -> - {noreply, emit_stats(State), hibernate}; - -handle_info(timeout, State) -> - shutdown(idle_timeout, State); - -handle_info({shutdown, conflict, {ClientId, NewPid}}, State) -> - ?WSLOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], State), - shutdown(conflict, State); - -handle_info({shutdown, Reason}, State) -> - shutdown(Reason, State); - -handle_info({keepalive, start, Interval}, State) -> - ?WSLOG(debug, "Keepalive at the interval of ~p", [Interval], State), - case emqx_keepalive:start(stat_fun(), Interval, {keepalive, check}) of - {ok, KeepAlive} -> - {noreply, State#wsclient_state{keepalive = KeepAlive}, hibernate}; - {error, Error} -> - ?WSLOG(warning, "Keepalive error - ~p", [Error], State), - shutdown(Error, State) - end; - -handle_info({keepalive, check}, State = #wsclient_state{keepalive = KeepAlive}) -> - case emqx_keepalive:check(KeepAlive) of - {ok, KeepAlive1} -> - {noreply, emit_stats(State#wsclient_state{keepalive = KeepAlive1}), hibernate}; - {error, timeout} -> - ?WSLOG(debug, "Keepalive Timeout!", [], State), - shutdown(keepalive_timeout, State); - {error, Error} -> - ?WSLOG(warning, "Keepalive error - ~p", [Error], State), - shutdown(keepalive_error, State) - end; - -handle_info({'EXIT', WsPid, normal}, State = #wsclient_state{ws_pid = WsPid}) -> - stop(normal, State); - -handle_info({'EXIT', WsPid, Reason}, State = #wsclient_state{ws_pid = WsPid}) -> - ?WSLOG(error, "shutdown: ~p",[Reason], State), - shutdown(Reason, State); - -%% The session process exited unexpectedly. -handle_info({'EXIT', Pid, Reason}, State = #wsclient_state{proto_state = ProtoState}) -> - case emqx_protocol:session(ProtoState) of - Pid -> stop(Reason, State); - _ -> ?WSLOG(error, "Unexpected EXIT: ~p, Reason: ~p", [Pid, Reason], State), - {noreply, State, hibernate} - end; - -handle_info(Info, State) -> - ?WSLOG(error, "Unexpected Info: ~p", [Info], State), - {noreply, State, hibernate}. - -terminate(Reason, #wsclient_state{proto_state = ProtoState, keepalive = KeepAlive}) -> - emqx_keepalive:cancel(KeepAlive), - case Reason of - {shutdown, Error} -> - emqx_protocol:shutdown(Error, ProtoState); - _ -> - emqx_protocol:shutdown(Reason, ProtoState) +call(WSPid, Req) -> + Mref = erlang:monitor(process, WSPid), + WSPid ! {call, {self(), Mref}, Req}, + receive + {Mref, Reply} -> + erlang:demonitor(Mref, [flush]), + Reply; + {'DOWN', Mref, _, _, Reason} -> + exit(Reason) + after 5000 -> + erlang:demonitor(Mref, [flush]), + exit(timeout) end. -code_change(_OldVsn, State, _Extra) -> - {ok, State}. +%%------------------------------------------------------------------------------ +%% WebSocket callbacks +%%------------------------------------------------------------------------------ -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- +init(Req, Opts) -> + io:format("Opts: ~p~n", [Opts]), + case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of + undefined -> + {cowboy_websocket, Req, #state{}}; + Subprotocols -> + case lists:member(<<"mqtt">>, Subprotocols) of + true -> + Resp = cowboy_req:set_resp_header(<<"sec-websocket-protocol">>, <<"mqtt">>, Req), + {cowboy_websocket, Resp, #state{request = Req, options = Opts}, #{idle_timeout => 86400000}}; + false -> + {ok, cowboy_req:reply(400, Req), #state{}} + end + end. + +websocket_init(#state{request = Req, options = Options}) -> + Peername = cowboy_req:peer(Req), + Sockname = cowboy_req:sock(Req), + Peercert = cowboy_req:cert(Req), + ProtoState = emqx_protocol:init(#{peername => Peername, + sockname => Sockname, + peercert => Peercert, + sendfun => send_fun(self())}, Options), + ParserState = emqx_protocol:parser(ProtoState), + Zone = proplists:get_value(zone, Options), + EnableStats = emqx_zone:env(Zone, enable_stats, true), + IdleTimout = emqx_zone:env(Zone, idle_timeout, 30000), + lists:foreach(fun(Stat) -> put(Stat, 0) end, ?SOCK_STATS), + {ok, #state{peername = Peername, + sockname = Sockname, + parser_state = ParserState, + proto_state = ProtoState, + enable_stats = EnableStats, + idle_timeout = IdleTimout}}. send_fun(WsPid) -> fun(Data) -> @@ -251,45 +130,143 @@ send_fun(WsPid) -> end. stat_fun() -> - fun() -> - {ok, get(recv_oct)} + fun() -> {ok, get(recv_oct)} end. + +websocket_handle({binary, <<>>}, State) -> + {ok, State}; +websocket_handle({binary, [<<>>]}, State) -> + {ok, State}; +websocket_handle({binary, Data}, State = #state{parser_state = ParserState, + proto_state = ProtoState}) -> + BinSize = iolist_size(Data), + put(recv_oct, get(recv_oct) + BinSize), + ?WSLOG(debug, "RECV ~p", [Data], State), + emqx_metrics:inc('bytes/received', BinSize), + case catch emqx_frame:parse(iolist_to_binary(Data), ParserState) of + {more, NewParserState} -> + {ok, State#state{parser_state = NewParserState}}; + {ok, Packet, Rest} -> + emqx_metrics:received(Packet), + put(recv_cnt, get(recv_cnt) + 1), + case emqx_protocol:received(Packet, ProtoState) of + {ok, ProtoState1} -> + websocket_handle({binary, Rest}, reset_parser(State#state{proto_state = ProtoState1})); + {error, Error} -> + ?WSLOG(error, "Protocol error - ~p", [Error], State), + {stop, State}; + {error, Error, ProtoState1} -> + shutdown(Error, State#state{proto_state = ProtoState1}); + {stop, Reason, ProtoState1} -> + shutdown(Reason, State#state{proto_state = ProtoState1}) + end; + {error, Error} -> + ?WSLOG(error, "Frame error: ~p", [Error], State), + {stop, State}; + {'EXIT', Reason} -> + ?WSLOG(error, "Frame error:~p~nFrame data: ~p", [Reason, Data], State), + {stop, State} end. -emit_stats(State = #wsclient_state{proto_state = ProtoState}) -> - emit_stats(emqx_protocol:clientid(ProtoState), State). +websocket_info({call, From, info}, State = #state{peername = Peername, + sockname = Sockname, + proto_state = ProtoState}) -> + ProtoInfo = emqx_protocol:info(ProtoState), + ConnInfo = [{socktype, websocket}, {conn_state, running}, + {peername, Peername}, {sockname, Sockname}], + gen_server:reply(From, lists:append([ConnInfo, ProtoInfo])), + {ok, State}; -emit_stats(_ClientId, State = #wsclient_state{enable_stats = false}) -> - State; -emit_stats(undefined, State) -> - State; -emit_stats(ClientId, State) -> - {reply, Stats, _, _} = handle_call(stats, undefined, State), - emqx_cm:set_client_stats(ClientId, Stats), +websocket_info({call, From, stats}, State = #state{proto_state = ProtoState}) -> + Stats = lists:append([wsock_stats(), emqx_misc:proc_stats(), emqx_protocol:stats(ProtoState)]), + gen_server:reply(From, Stats), + {ok, State}; + +websocket_info({call, From, kick}, State) -> + gen_server:reply(From, ok), + shutdown(kick, State); + +websocket_info({call, From, session}, State = #state{proto_state = ProtoState}) -> + gen_server:reply(From, emqx_protocol:session(ProtoState)), + {ok, State}; + +websocket_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) -> + case emqx_protocol:deliver(PubOrAck, ProtoState) of + {ok, ProtoState1} -> + {ok, ensure_stats_timer(State#state{proto_state = ProtoState1})}; + {error, Reason} -> + shutdown(Reason, State); + {error, Reason, ProtoState1} -> + shutdown(Reason, State#state{proto_state = ProtoState1}) + end; + +websocket_info(emit_stats, State = #state{proto_state = ProtoState}) -> + Stats = lists:append([wsock_stats(), emqx_misc:proc_stats(), + emqx_protocol:stats(ProtoState)]), + emqx_cm:set_client_stats(emqx_protocol:clientid(ProtoState), Stats), + {ok, State#state{stats_timer = undefined}, hibernate}; + +websocket_info({keepalive, start, Interval}, State) -> + ?WSLOG(debug, "Keepalive at the interval of ~p", [Interval], State), + case emqx_keepalive:start(stat_fun(), Interval, {keepalive, check}) of + {ok, KeepAlive} -> + {ok, State#state{keepalive = KeepAlive}}; + {error, Error} -> + ?WSLOG(warning, "Keepalive error - ~p", [Error], State), + shutdown(Error, State) + end; + +websocket_info({keepalive, check}, State = #state{keepalive = KeepAlive}) -> + case emqx_keepalive:check(KeepAlive) of + {ok, KeepAlive1} -> + {ok, State#state{keepalive = KeepAlive1}}; + {error, timeout} -> + ?WSLOG(debug, "Keepalive Timeout!", [], State), + shutdown(keepalive_timeout, State); + {error, Error} -> + ?WSLOG(warning, "Keepalive error - ~p", [Error], State), + shutdown(keepalive_error, State) + end; + +websocket_info({shutdown, conflict, {ClientId, NewPid}}, State) -> + ?WSLOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], State), + shutdown(conflict, State); + +websocket_info({binary, Data}, State) -> + {reply, {binary, Data}, State}; + +websocket_info({shutdown, Reason}, State) -> + shutdown(Reason, State); + +websocket_info(Info, State) -> + ?WSLOG(error, "unexpected info: ~p", [Info], State), + {ok, State}. + +terminate(SockError, _Req, #state{keepalive = Keepalive, + proto_state = ProtoState, + shutdown_reason = Reason}) -> + emqx_keepalive:cancel(Keepalive), + io:format("Websocket shutdown for ~p, sockerror: ~p~n", [Reason, SockError]), + case Reason of + undefined -> + ok; + %%emqx_protocol:shutdown(SockError, ProtoState); + _ -> + ok%%emqx_protocol:shutdown(Reason, ProtoState) + end. + +reset_parser(State = #state{proto_state = ProtoState}) -> + State#state{parser_state = emqx_protocol:parser(ProtoState)}. + +ensure_stats_timer(State = #state{enable_stats = true, + stats_timer = undefined, + idle_timeout = Timeout}) -> + State#state{stats_timer = erlang:send_after(Timeout, self(), emit_stats)}; +ensure_stats_timer(State) -> State. -wsock_stats() -> - [{Key, get(Key)}|| Key <- ?SOCK_STATS]. - -with_proto(Fun, State = #wsclient_state{proto_state = ProtoState}) -> - {ok, ProtoState1} = Fun(ProtoState), - {noreply, State#wsclient_state{proto_state = ProtoState1}, hibernate}. - -reply(Reply, State) -> - {reply, Reply, State, hibernate}. - shutdown(Reason, State) -> - stop({shutdown, Reason}, State). + {stop, State#state{shutdown_reason = Reason}}. -stop(Reason, State) -> - {stop, Reason, State}. - -gc(State) -> - Cb = fun() -> emit_stats(State) end, - emqx_gc:maybe_force_gc(#wsclient_state.force_gc_count, State, Cb). - -init_stas() -> - put(recv_oct, 0), - put(recv_cnt, 0), - put(send_oct, 0), - put(send_cnt, 0). +wsock_stats() -> + [{Key, get(Key)} || Key <- ?SOCK_STATS]. diff --git a/src/emqx_ws_connection_sup.erl b/src/emqx_ws_connection_sup.erl deleted file mode 100644 index 1216eeb75..000000000 --- a/src/emqx_ws_connection_sup.erl +++ /dev/null @@ -1,44 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_ws_connection_sup). - --behavior(supervisor). - --export([start_link/0, start_connection/2]). - --export([init/1]). - --spec(start_link() -> {ok, pid()}). -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - -%% @doc Start a MQTT/WebSocket Connection. --spec(start_connection(pid(), cowboy_req:req()) -> {ok, pid()}). -start_connection(WsPid, Req) -> - supervisor:start_child(?MODULE, [WsPid, Req]). - -%%-------------------------------------------------------------------- -%% Supervisor callbacks -%%-------------------------------------------------------------------- - -init([]) -> - %%TODO: Cannot upgrade the environments, Use zone? - Env = lists:append(emqx_config:get_env(client, []), emqx_config:get_env(protocol, [])), - {ok, {{simple_one_for_one, 0, 1}, - [{ws_connection, {emqx_ws_connection, start_link, [Env]}, - temporary, 5000, worker, [emqx_ws_connection]}]}}. - diff --git a/src/emqx_zone.erl b/src/emqx_zone.erl index 0d874a38b..830f08b89 100644 --- a/src/emqx_zone.erl +++ b/src/emqx_zone.erl @@ -17,24 +17,32 @@ -behaviour(gen_server). -export([start_link/0]). --export([get_env/2, get_env/3]). + +-export([env/2, env/3]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -record(state, {timer}). + -define(TAB, ?MODULE). --define(SERVER, ?MODULE). start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). -get_env(Zone, Par) -> - get_env(Zone, Par, undefined). +env(undefined, Par) -> + emqx_config:get_env(Par); +env(Zone, Par) -> + env(Zone, Par, undefined). -get_env(Zone, Par, Def) -> - try ets:lookup_element(?TAB, {Zone, Par}, 2) catch error:badarg -> Def end. +env(undefined, Par, Default) -> + emqx_config:get_env(Par, Default); +env(Zone, Par, Default) -> + try ets:lookup_element(?TAB, {Zone, Par}, 2) + catch error:badarg -> + emqx_config:get_env(Par, Default) + end. %%------------------------------------------------------------------------------ %% gen_server callbacks @@ -54,8 +62,8 @@ handle_cast(Msg, State) -> handle_info(reload, State) -> lists:foreach( - fun({Zone, Options}) -> - [ets:insert(?TAB, {{Zone, Par}, Val}) || {Par, Val} <- Options] + fun({Zone, Opts}) -> + [ets:insert(?TAB, {{Zone, Par}, Val}) || {Par, Val} <- Opts] end, emqx_config:get_env(zones, [])), {noreply, ensure_reload_timer(State), hibernate};