From 2eed46310c58bf80697b308255e72f972eb942d5 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 30 Mar 2018 17:18:08 +0800 Subject: [PATCH] Misc fix for the MQTT Version 5.0 --- include/emqx.hrl | 196 ++++++++++++++++------------------ include/emqx_mqtt.hrl | 26 ++++- src/emqx_access_control.erl | 29 ++--- src/emqx_access_rule.erl | 18 ++-- src/emqx_acl_internal.erl | 2 +- src/emqx_acl_mod.erl | 5 +- src/emqx_auth_mod.erl | 2 +- src/emqx_cm.erl | 116 +++++++++----------- src/emqx_cm_sup.erl | 27 ++--- src/emqx_connection.erl | 4 +- src/emqx_message.erl | 167 ++++++++--------------------- src/emqx_mod_presence.erl | 49 +++++---- src/emqx_mod_subscription.erl | 8 +- src/emqx_packet.erl | 46 +++++++- src/emqx_protocol.erl | 61 +++++------ src/emqx_rpc.erl | 7 +- src/emqx_serializer.erl | 1 + src/emqx_session.erl | 82 +++++++------- src/emqx_sm.erl | 15 ++- src/emqx_sm_locker.erl | 4 +- src/emqx_sm_sup.erl | 11 +- src/emqx_tracer.erl | 4 +- 22 files changed, 429 insertions(+), 451 deletions(-) diff --git a/include/emqx.hrl b/include/emqx.hrl index d584f2cd3..ddd91dbca 100644 --- a/include/emqx.hrl +++ b/include/emqx.hrl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -18,97 +18,118 @@ %% Banner %%-------------------------------------------------------------------- --define(COPYRIGHT, "Copyright (c) 2013-2018 EMQ Enterprise, Inc."). +-define(COPYRIGHT, "Copyright (c) 2013-2018 EMQ Inc."). -define(LICENSE_MESSAGE, "Licensed under the Apache License, Version 2.0"). -define(PROTOCOL_VERSION, "MQTT/5.0"). -%%-define(ERTS_MINIMUM, "9.0"). +-define(ERTS_MINIMUM_REQUIRED, "9.2"). %%-------------------------------------------------------------------- -%% Sys/Queue/Share Topics' Prefix +%% Topics' prefix: $SYS | $queue | $share %%-------------------------------------------------------------------- --define(SYSTOP, <<"$SYS/">>). %% System Topic +%% System Topic +-define(SYSTOP, <<"$SYS/">>). --define(QUEUE, <<"$queue/">>). %% Queue Topic +%% Queue Topic +-define(QUEUE, <<"$queue/">>). --define(SHARE, <<"$share/">>). %% Shared Topic +%% Shared Topic +-define(SHARE, <<"$share/">>). %%-------------------------------------------------------------------- -%% Client and Session +%% Topic, subscription and subscriber %%-------------------------------------------------------------------- +-type(qos() :: integer()). + -type(topic() :: binary()). --type(subscriber() :: pid() | binary() | {binary(), pid()}). +-type(suboption() :: {qos, qos()} + | {share, '$queue'} + | {share, binary()} + | {atom(), term()}). --type(suboption() :: {qos, non_neg_integer()} | {share, {'$queue' | binary()}}). +-record(subscription, + { subid :: binary() | atom(), + topic :: topic(), + subopts :: list(suboption()) + }). + +-type(subscription() :: #subscription{}). + +-type(subscriber() :: binary() | pid() | {binary(), pid()}). + +%%-------------------------------------------------------------------- +%% Client and session +%%-------------------------------------------------------------------- + +-type(protocol() :: mqtt | 'mqtt-sn' | coap | stomp | none | atom()). + +-type(peername() :: {inet:ip_address(), inet:port_number()}). -type(client_id() :: binary() | atom()). --type(protocol() :: mqtt | 'mqtt-sn' | coap | stomp | atom()). +-type(username() :: binary() | atom()). --type(client() :: #{zone := atom(), - node := atom(), - clientid := client_id(), - protocol := protocol(), - connector => atom(), - peername => {inet:ip_address(), inet:port_number()}, - username => binary(), - atom() => term()}). +-type(mountpoint() :: binary()). --type(session() :: #{client_id := client_id(), - clean_start := boolean(), - expiry_interval := non_neg_integer()}). +-type(connector() :: atom()). --record(session, {client_id, sess_pid}). +-type(zone() :: atom()). + +-record(client, + { id :: client_id(), + pid :: pid(), + zone :: zone(), + node :: node(), + username :: username(), + peername :: peername(), + protocol :: protocol(), + connector :: connector(), + mountpoint :: mountpoint(), + attributes :: #{atom() => term()} + }). + +-type(client() :: #client{}). + +-record(session, + { client_id :: client_id(), + pid :: pid() + }). + +-type(session() :: #session{}). %%-------------------------------------------------------------------- -%% Message and Delivery +%% Message and delivery %%-------------------------------------------------------------------- -type(message_id() :: binary() | undefined). -%% -type(protocol() :: mqtt | 'mqtt-sn' | coap | stomp | atom()). +-type(message_flag() :: sys | dup | retain | atom()). --type(message_from() :: #{zone := atom(), - node := atom(), - clientid := binary(), - connector => atom(), - peername => {inet:ip_address(), inet:port_number()}, - username => binary(), - atom() => term()}). - --type(message_flags() :: #{dup => boolean(), %% Dup flag - qos => 0 | 1 | 2, %% QoS - sys => boolean(), %% $SYS flag - retain => boolean(), %% Retain flag - durable => boolean(), %% Durable flag - atom() => boolean()}). +-type(message_flags() :: #{message_flag() => boolean()}). -type(message_headers() :: #{packet_id => pos_integer(), - priority => pos_integer(), - expiry => integer(), %% Time to live + priority => non_neg_integer(), + ttl => pos_integer(), atom() => term()}). +-type(payload() :: binary()). + %% See 'Application Message' in MQTT Version 5.0 -record(message, { id :: message_id(), %% Global unique id - from :: message_from(), %% Message from + qos :: qos(), %% Message QoS + from :: atom() | client(), %% Message from sender :: pid(), %% The pid of the sender/publisher - packet_id, - dup :: boolean(), %% Dup flag - qos :: 0 | 1 | 2, %% QoS - sys :: boolean(), %% $SYS flag - retain :: boolean(), %% Retain flag - flags = [], %% :: message_flags(), %% Message flags + flags :: message_flags(), %% Message flags headers :: message_headers(), %% Message headers - protocol :: protocol(), topic :: binary(), %% Message topic properties :: map(), %% Message user properties - payload :: binary(), %% Message payload + payload :: payload(), %% Message payload timestamp :: erlang:timestamp() %% Timestamp }). @@ -127,62 +148,16 @@ -type(pubsub() :: publish | subscribe). --define(PS(PS), (PS =:= publish orelse PS =:= subscribe)). - -%%-------------------------------------------------------------------- -%% Subscription -%%-------------------------------------------------------------------- - --record(subscription, - { subid :: binary() | atom(), - topic :: binary(), - subopts :: list() - }). - --type(subscription() :: #subscription{}). - -%%-------------------------------------------------------------------- -%% MQTT Client -%%-------------------------------------------------------------------- - --type(ws_header_key() :: atom() | binary() | string()). --type(ws_header_val() :: atom() | binary() | string() | integer()). - --record(mqtt_client, - { client_id :: binary() | undefined, - client_pid :: pid(), - username :: binary() | undefined, - peername :: {inet:ip_address(), inet:port_number()}, - clean_sess :: boolean(), - proto_ver :: 3 | 4, - keepalive = 0, - will_topic :: undefined | binary(), - ws_initial_headers :: list({ws_header_key(), ws_header_val()}), - mountpoint :: undefined | binary(), - connected_at :: erlang:timestamp(), - %%TODO: Headers - headers = [] :: list() - }). - --type(mqtt_client() :: #mqtt_client{}). - -%%-------------------------------------------------------------------- -%% MQTT Session -%%-------------------------------------------------------------------- - --record(mqtt_session, - { client_id :: binary(), - sess_pid :: pid(), - clean_sess :: boolean() - }). - --type(mqtt_session() :: #mqtt_session{}). +-define(PS(I), (I =:= publish orelse I =:= subscribe)). %%-------------------------------------------------------------------- %% Route %%-------------------------------------------------------------------- --record(route, { topic :: binary(), dest :: {binary(), node()} | node() }). +-record(route, + { topic :: topic(), + dest :: {binary(), node()} | node() + }). -type(route() :: #route{}). @@ -227,7 +202,15 @@ %% Plugin %%-------------------------------------------------------------------- --record(plugin, { name, version, descr, active = false }). +-record(plugin, + { name :: atom(), + version :: string(), + dir :: string(), + descr :: string(), + vendor :: string(), + active :: boolean(), + info :: map() + }). -type(plugin() :: #plugin{}). @@ -235,7 +218,14 @@ %% Command %%-------------------------------------------------------------------- --record(command, { name, action, args = [], opts = [], usage, descr }). +-record(command, + { name, + action, + args = [], + opts = [], + usage, + descr + }). -type(command() :: #command{}). diff --git a/include/emqx_mqtt.hrl b/include/emqx_mqtt.hrl index 62ee0e7d3..65f6a1274 100644 --- a/include/emqx_mqtt.hrl +++ b/include/emqx_mqtt.hrl @@ -77,6 +77,26 @@ -define(MAX_CLIENTID_LEN, 1024). +%%-------------------------------------------------------------------- +%% MQTT Client +%%-------------------------------------------------------------------- + +-record(mqtt_client, + { client_id :: binary() | undefined, + client_pid :: pid(), + username :: binary() | undefined, + peername :: {inet:ip_address(), inet:port_number()}, + clean_sess :: boolean(), + proto_ver :: 3 | 4, + keepalive = 0, + will_topic :: undefined | binary(), + mountpoint :: undefined | binary(), + connected_at :: erlang:timestamp(), + attributes :: map() + }). + +-type(mqtt_client() :: #mqtt_client{}). + %%-------------------------------------------------------------------- %% MQTT Control Packet Types %%-------------------------------------------------------------------- @@ -278,12 +298,12 @@ -define(CONNACK_PACKET(ReasonCode), #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}, - variable = #mqtt_packet_connack{reason_code = ReturnCode}}). + variable = #mqtt_packet_connack{reason_code = ReasonCode}}). -define(CONNACK_PACKET(ReasonCode, SessPresent), #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}, - variable = #mqtt_packet_connack{ack_flags = SessPresent, - reason_code = ReturnCode}}). + variable = #mqtt_packet_connack{ack_flags = SessPresent, + reason_code = ReasonCode}}). -define(CONNACK_PACKET(ReasonCode, SessPresent, Properties), #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}, diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index 10aa68e91..4d573e283 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -30,7 +30,7 @@ -define(SERVER, ?MODULE). --define(ACCESS_CONTROL_TAB, mqtt_access_control). +-define(TAB, access_control). -type(password() :: undefined | binary()). @@ -45,9 +45,10 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). -%% @doc Authenticate MQTT Client. --spec(auth(Client :: mqtt_client(), Password :: password()) -> ok | {ok, boolean()} | {error, term()}). -auth(Client, Password) when is_record(Client, mqtt_client) -> +%% @doc Authenticate Client. +-spec(auth(Client :: client(), Password :: password()) + -> ok | {ok, boolean()} | {error, term()}). +auth(Client, Password) when is_record(Client, client) -> auth(Client, Password, lookup_mods(auth)). auth(_Client, _Password, []) -> case emqx:env(allow_anonymous, false) of @@ -65,7 +66,7 @@ auth(Client, Password, [{Mod, State, _Seq} | Mods]) -> %% @doc Check ACL -spec(check_acl(Client, PubSub, Topic) -> allow | deny when - Client :: mqtt_client(), + Client :: client(), PubSub :: pubsub(), Topic :: binary()). check_acl(Client, PubSub, Topic) when ?PS(PubSub) -> @@ -102,7 +103,7 @@ unregister_mod(Type, Mod) when Type =:= auth; Type =:= acl -> %% @doc Lookup authentication or ACL modules. -spec(lookup_mods(auth | acl) -> list()). lookup_mods(Type) -> - case ets:lookup(?ACCESS_CONTROL_TAB, tab_key(Type)) of + case ets:lookup(?TAB, tab_key(Type)) of [] -> []; [{_, Mods}] -> Mods end. @@ -111,14 +112,15 @@ tab_key(auth) -> auth_modules; tab_key(acl) -> acl_modules. %% @doc Stop access control server. -stop() -> gen_server:call(?MODULE, stop). +stop() -> + gen_server:call(?MODULE, stop). %%-------------------------------------------------------------------- %% gen_server Callbacks %%-------------------------------------------------------------------- init([]) -> - ets:new(?ACCESS_CONTROL_TAB, [set, named_table, protected, {read_concurrency, true}]), + _ = ets:new(?TAB, [set, named_table, protected, {read_concurrency, true}]), {ok, #state{}}. handle_call({register_mod, Type, Mod, Opts, Seq}, _From, State) -> @@ -130,7 +132,7 @@ handle_call({register_mod, Type, Mod, Opts, Seq}, _From, State) -> NewMods = lists:sort(fun({_, _, Seq1}, {_, _, Seq2}) -> Seq1 >= Seq2 end, [{Mod, ModState, Seq} | Mods]), - ets:insert(?ACCESS_CONTROL_TAB, {tab_key(Type), NewMods}), + ets:insert(?TAB, {tab_key(Type), NewMods}), ok; {error, Error} -> {error, Error}; @@ -145,7 +147,7 @@ handle_call({unregister_mod, Type, Mod}, _From, State) -> false -> {reply, {error, not_found}, State}; _ -> - ets:insert(?ACCESS_CONTROL_TAB, {tab_key(Type), lists:keydelete(Mod, 1, Mods)}), + _ = ets:insert(?TAB, {tab_key(Type), lists:keydelete(Mod, 1, Mods)}), {reply, ok, State} end; @@ -172,7 +174,8 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%-------------------------------------------------------------------- -if_existed(false, Fun) -> Fun(); - -if_existed(_Mod, _Fun) -> {error, already_existed}. +if_existed(false, Fun) -> + Fun(); +if_existed(_Mod, _Fun) -> + {error, already_existed}. diff --git a/src/emqx_access_rule.erl b/src/emqx_access_rule.erl index 1d54d0fef..63bf3b9ad 100644 --- a/src/emqx_access_rule.erl +++ b/src/emqx_access_rule.erl @@ -82,7 +82,7 @@ bin(B) when is_binary(B) -> B. %% @doc Match access rule --spec(match(mqtt_client(), topic(), rule()) -> {matched, allow} | {matched, deny} | nomatch). +-spec(match(client(), topic(), rule()) -> {matched, allow} | {matched, deny} | nomatch). match(_Client, _Topic, {AllowDeny, all}) when (AllowDeny =:= allow) orelse (AllowDeny =:= deny) -> {matched, AllowDeny}; match(Client, Topic, {AllowDeny, Who, _PubSub, TopicFilters}) @@ -99,13 +99,13 @@ match_who(_Client, {user, all}) -> true; match_who(_Client, {client, all}) -> true; -match_who(#mqtt_client{client_id = ClientId}, {client, ClientId}) -> +match_who(#client{id = ClientId}, {client, ClientId}) -> true; -match_who(#mqtt_client{username = Username}, {user, Username}) -> +match_who(#client{username = Username}, {user, Username}) -> true; -match_who(#mqtt_client{peername = undefined}, {ipaddr, _Tup}) -> +match_who(#client{peername = undefined}, {ipaddr, _Tup}) -> false; -match_who(#mqtt_client{peername = {IP, _}}, {ipaddr, CIDR}) -> +match_who(#client{peername = {IP, _}}, {ipaddr, CIDR}) -> esockd_cidr:match(IP, CIDR); match_who(Client, {'and', Conds}) when is_list(Conds) -> lists:foldl(fun(Who, Allow) -> @@ -137,13 +137,13 @@ feed_var(Client, Pattern) -> feed_var(Client, Pattern, []). feed_var(_Client, [], Acc) -> lists:reverse(Acc); -feed_var(Client = #mqtt_client{client_id = undefined}, [<<"%c">>|Words], Acc) -> +feed_var(Client = #client{id = undefined}, [<<"%c">>|Words], Acc) -> feed_var(Client, Words, [<<"%c">>|Acc]); -feed_var(Client = #mqtt_client{client_id = ClientId}, [<<"%c">>|Words], Acc) -> +feed_var(Client = #client{id = ClientId}, [<<"%c">>|Words], Acc) -> feed_var(Client, Words, [ClientId |Acc]); -feed_var(Client = #mqtt_client{username = undefined}, [<<"%u">>|Words], Acc) -> +feed_var(Client = #client{username = undefined}, [<<"%u">>|Words], Acc) -> feed_var(Client, Words, [<<"%u">>|Acc]); -feed_var(Client = #mqtt_client{username = Username}, [<<"%u">>|Words], Acc) -> +feed_var(Client = #client{username = Username}, [<<"%u">>|Words], Acc) -> feed_var(Client, Words, [Username|Acc]); feed_var(Client, [W|Words], Acc) -> feed_var(Client, Words, [W|Acc]). diff --git a/src/emqx_acl_internal.erl b/src/emqx_acl_internal.erl index 030af96c3..4694f43c3 100644 --- a/src/emqx_acl_internal.erl +++ b/src/emqx_acl_internal.erl @@ -77,7 +77,7 @@ filter(_PubSub, {_AllowDeny, _Who, _, _Topics}) -> %% @doc Check ACL -spec(check_acl({Client, PubSub, Topic}, State) -> allow | deny | ignore when - Client :: mqtt_client(), + Client :: client(), PubSub :: pubsub(), Topic :: binary(), State :: #state{}). diff --git a/src/emqx_acl_mod.erl b/src/emqx_acl_mod.erl index 40f72f898..e01c0ed95 100644 --- a/src/emqx_acl_mod.erl +++ b/src/emqx_acl_mod.erl @@ -26,9 +26,10 @@ -callback(init(AclOpts :: list()) -> {ok, State :: any()}). --callback(check_acl({Client :: mqtt_client(), +-callback(check_acl({Client :: client(), PubSub :: pubsub(), - Topic :: binary()}, State :: any()) -> allow | deny | ignore). + Topic :: topic()}, State :: any()) + -> allow | deny | ignore). -callback(reload_acl(State :: any()) -> ok | {error, term()}). diff --git a/src/emqx_auth_mod.erl b/src/emqx_auth_mod.erl index bc2057a06..2cf222d65 100644 --- a/src/emqx_auth_mod.erl +++ b/src/emqx_auth_mod.erl @@ -30,7 +30,7 @@ -callback(init(AuthOpts :: list()) -> {ok, State :: any()}). --callback(check(Client :: mqtt_client(), +-callback(check(Client :: client(), Password :: binary(), State :: any()) -> ok | {ok, boolean()} | ignore | {error, string()}). diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index c929ecb1f..4d08a499d 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -20,110 +20,93 @@ -include("emqx.hrl"). -%% API Exports --export([start_link/3]). +-export([start_link/1]). --export([lookup/1, lookup_proc/1, reg/1, unreg/1]). +-export([lookup/1, reg/1, unreg/1]). -%% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {pool, id, statsfun, monitors}). +-record(state, {stats_fun, stats_timer, monitors}). --define(POOL, ?MODULE). +-define(SERVER, ?MODULE). %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- -%% @doc Start Client Manager --spec(start_link(atom(), pos_integer(), fun()) -> {ok, pid()} | ignore | {error, term()}). -start_link(Pool, Id, StatsFun) -> - gen_server:start_link(?MODULE, [Pool, Id, StatsFun], []). +%% @doc Start the client manager +-spec(start_link(fun()) -> {ok, pid()} | ignore | {error, term()}). +start_link(StatsFun) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [StatsFun], []). -%% @doc Lookup Client by ClientId --spec(lookup(binary()) -> mqtt_client() | undefined). +%% @doc Lookup ClientPid by ClientId +-spec(lookup(client_id()) -> pid() | undefined). lookup(ClientId) when is_binary(ClientId) -> - case ets:lookup(mqtt_client, ClientId) of [Client] -> Client; [] -> undefined end. - -%% @doc Lookup client pid by clientId --spec(lookup_proc(binary()) -> pid() | undefined). -lookup_proc(ClientId) when is_binary(ClientId) -> - try ets:lookup_element(mqtt_client, ClientId, #mqtt_client.client_pid) + try ets:lookup_element(client, ClientId, 2) catch error:badarg -> undefined end. -%% @doc Register ClientId with Pid. --spec(reg(mqtt_client()) -> ok). -reg(Client = #mqtt_client{client_id = ClientId}) -> - gen_server:call(pick(ClientId), {reg, Client}, 120000). +%% @doc Register a clientId +-spec(reg(client_id()) -> ok). +reg(ClientId) -> + gen_server:cast(?SERVER, {reg, ClientId, self()}). %% @doc Unregister clientId with pid. --spec(unreg(binary()) -> ok). -unreg(ClientId) when is_binary(ClientId) -> - gen_server:cast(pick(ClientId), {unreg, ClientId, self()}). - -pick(ClientId) -> gproc_pool:pick_worker(?POOL, ClientId). +-spec(unreg(client_id()) -> ok). +unreg(ClientId) -> + gen_server:cast(?SERVER, {unreg, ClientId, self()}). %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- -init([Pool, Id, StatsFun]) -> - gproc_pool:connect_worker(Pool, {Pool, Id}), - {ok, #state{pool = Pool, id = Id, statsfun = StatsFun, monitors = dict:new()}}. - -handle_call({reg, Client = #mqtt_client{client_id = ClientId, - client_pid = Pid}}, _From, State) -> - case lookup_proc(ClientId) of - Pid -> - {reply, ok, State}; - _ -> - ets:insert(mqtt_client, Client), - {reply, ok, setstats(monitor_client(ClientId, Pid, State))} - end; +init([StatsFun]) -> + {ok, Ref} = timer:send_interval(timer:seconds(1), stats), + {ok, #state{stats_fun = StatsFun, stats_timer = Ref, monitors = dict:new()}}. handle_call(Req, _From, State) -> - lager:error("[MQTT-CM] Unexpected Call: ~p", [Req]), + emqx_log:error("[CM] Unexpected request: ~p", [Req]), {reply, ignore, State}. +handle_cast({reg, ClientId, Pid}, State) -> + _ = ets:insert(client, {ClientId, Pid}), + {noreply, monitor_client(ClientId, Pid, State)}; + handle_cast({unreg, ClientId, Pid}, State) -> - case lookup_proc(ClientId) of - Pid -> - ets:delete(mqtt_client, ClientId), - {noreply, setstats(State)}; - _ -> - {noreply, State} - end; + case lookup(ClientId) of + Pid -> remove_client({ClientId, Pid}); + _ -> ok + end, + {noreply, State}; handle_cast(Msg, State) -> - lager:error("[MQTT-CM] Unexpected Cast: ~p", [Msg]), + emqx_log:error("[CM] Unexpected msg: ~p", [Msg]), {noreply, State}. handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> case dict:find(MRef, State#state.monitors) of - {ok, {ClientId, DownPid}} -> - case lookup_proc(ClientId) of - DownPid -> - emqx_stats:del_client_stats(ClientId), - ets:delete(mqtt_client, ClientId); - _ -> - ignore + {ok, ClientId} -> + case lookup(ClientId) of + DownPid -> remove_client({ClientId, DownPid}); + _ -> ok end, - {noreply, setstats(erase_monitor(MRef, State))}; + {noreply, erase_monitor(MRef, State)}; error -> - lager:error("MRef of client ~p not found", [DownPid]), + emqx_log:error("[CM] down client ~p not found", [DownPid]), {noreply, State} end; +handle_info(stats, State) -> + {noreply, setstats(State), hibernate}; + handle_info(Info, State) -> lager:error("[CM] Unexpected Info: ~p", [Info]), {noreply, State}. -terminate(_Reason, #state{pool = Pool, id = Id}) -> - gproc_pool:disconnect_worker(Pool, {Pool, Id}). +terminate(_Reason, _State = #state{stats_timer = TRef}) -> + timer:cancel(TRef). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -132,14 +115,19 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%-------------------------------------------------------------------- +remove_client(Client) -> + ets:delete_object(client, Client), + ets:delete(client_stats, Client), + ets:delete(client_attrs, Client). + monitor_client(ClientId, Pid, State = #state{monitors = Monitors}) -> MRef = erlang:monitor(process, Pid), - State#state{monitors = dict:store(MRef, {ClientId, Pid}, Monitors)}. + State#state{monitors = dict:store(MRef, ClientId, Monitors)}. erase_monitor(MRef, State = #state{monitors = Monitors}) -> - erlang:demonitor(MRef, [flush]), + erlang:demonitor(MRef), State#state{monitors = dict:erase(MRef, Monitors)}. -setstats(State = #state{statsfun = StatsFun}) -> - StatsFun(ets:info(mqtt_client, size)), State. +setstats(State = #state{stats_fun = StatsFun}) -> + StatsFun(ets:info(client, size)), State. diff --git a/src/emqx_cm_sup.erl b/src/emqx_cm_sup.erl index b73b2f5f5..04446c725 100644 --- a/src/emqx_cm_sup.erl +++ b/src/emqx_cm_sup.erl @@ -18,33 +18,24 @@ -behaviour(supervisor). -%% API -export([start_link/0]). -%% Supervisor callbacks -export([init/1]). --define(TAB, mqtt_client). - start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - %% Create client table - create_client_tab(), + %% Create table + lists:foreach(fun create_tab/1, [client, client_stats, client_attrs]), - %% CM Pool Sup - MFA = {emqx_cm, start_link, [emqx_stats:statsfun('clients/count', 'clients/max')]}, - PoolSup = emqx_pool_sup:spec([emqx_cm, hash, erlang:system_info(schedulers), MFA]), + StatsFun = emqx_stats:statsfun('clients/count', 'clients/max'), + + CM = {emqx_cm, {emqx_cm, start_link, [StatsFun]}, + permanent, 5000, worker, [emqx_cm]}, - {ok, {{one_for_all, 10, 3600}, [PoolSup]}}. + {ok, {{one_for_all, 10, 3600}, [CM]}}. -create_client_tab() -> - case ets:info(?TAB, name) of - undefined -> - ets:new(?TAB, [ordered_set, named_table, public, - {keypos, 2}, {write_concurrency, true}]); - _ -> - ok - end. +create_tab(Tab) -> + emqx_tables:create(Tab, [public, ordered_set, named_table, {write_concurrency, true}]). diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 1555f29cb..e09ad83b2 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -202,8 +202,8 @@ handle_info({suback, PacketId, GrantedQos}, State) -> end, State); %% Fastlane -handle_info({dispatch, _Topic, Message}, State) -> - handle_info({deliver, Message#message{qos = ?QOS_0}}, State); +handle_info({dispatch, _Topic, Msg}, State) -> + handle_info({deliver, emqx_message:set_flag(qos, ?QOS_0, Msg)}, State); handle_info({deliver, Message}, State) -> with_proto( diff --git a/src/emqx_message.erl b/src/emqx_message.erl index 5ef4f8f8f..50198f3e1 100644 --- a/src/emqx_message.erl +++ b/src/emqx_message.erl @@ -18,139 +18,62 @@ -include("emqx.hrl"). --include("emqx_mqtt.hrl"). +-export([make/3]). --export([make/3, make/4, from_packet/1, from_packet/2, from_packet/3, - to_packet/1]). +-export([get_flag/2, get_flag/3, set_flag/2, unset_flag/2]). --export([set_flag/1, set_flag/2, unset_flag/1, unset_flag/2]). +-export([get_header/2, get_header/3, set_header/3]). --export([format/1]). +-export([get_user_property/2, get_user_property/3, set_user_property/3]). --type(msg_from() :: atom() | {binary(), undefined | binary()}). - -%% @doc Make a message --spec(make(msg_from(), binary(), binary()) -> message()). -make(From, Topic, Payload) -> - make(From, ?QOS_0, Topic, Payload). - --spec(make(msg_from(), mqtt_qos(), binary(), binary()) -> message()). -make(From, Qos, Topic, Payload) -> - #message{id = msgid(), - from = From, - qos = ?QOS_I(Qos), - topic = Topic, - payload = Payload, - timestamp = os:timestamp()}. - -%% @doc Message from Packet --spec(from_packet(mqtt_packet()) -> message()). -from_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, - retain = Retain, - qos = Qos, - dup = Dup}, - variable = #mqtt_packet_publish{topic_name = Topic, - packet_id = PacketId}, - payload = Payload}) -> - #message{id = msgid(), - packet_id = PacketId, - qos = Qos, - retain = Retain, - dup = Dup, - topic = Topic, - payload = Payload, - timestamp = os:timestamp()}; - -from_packet(#mqtt_packet_connect{will_flag = false}) -> - undefined; - -from_packet(#mqtt_packet_connect{client_id = ClientId, - username = Username, - will_retain = Retain, - will_qos = Qos, - will_topic = Topic, - will_msg = Msg}) -> - #message{id = msgid(), - topic = Topic, - from = {ClientId, Username}, - retain = Retain, - qos = Qos, - dup = false, - payload = Msg, - timestamp = os:timestamp()}. - -from_packet(ClientId, Packet) -> - Msg = from_packet(Packet), - Msg#message{from = ClientId}. - -from_packet(Username, ClientId, Packet) -> - Msg = from_packet(Packet), - Msg#message{from = {ClientId, Username}}. +%% Create a default message +-spec(make(atom() | client(), topic(), payload()) -> message()). +make(From, Topic, Payload) when is_atom(From); is_record(From, client) -> + #message{id = msgid(), + qos = 0, + from = From, + sender = self(), + flags = #{}, + headers = #{}, + topic = Topic, + properties = #{}, + payload = Payload, + timestamp = os:timestamp()}. msgid() -> emqx_guid:gen(). -%% @doc Message to Packet --spec(to_packet(message()) -> mqtt_packet()). -to_packet(#message{packet_id = PkgId, - qos = Qos, - retain = Retain, - dup = Dup, - topic = Topic, - payload = Payload}) -> +%% @doc Get flag +get_flag(Flag, Msg) -> + get_flag(Flag, Msg, false). +get_flag(Flag, #message{flags = Flags}, Default) -> + maps:get(Flag, Flags, Default). - #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, - qos = Qos, - retain = Retain, - dup = Dup}, - variable = #mqtt_packet_publish{topic_name = Topic, - packet_id = if - Qos =:= ?QOS_0 -> undefined; - true -> PkgId - end - }, - payload = Payload}. +%% @doc Set flag +-spec(set_flag(message_flag(), message()) -> message()). +set_flag(Flag, Msg = #message{flags = Flags}) -> + Msg#message{flags = maps:put(Flag, true, Flags)}. -%% @doc set dup, retain flag --spec(set_flag(message()) -> message()). -set_flag(Msg) -> - Msg#message{dup = true, retain = true}. +%% @doc Unset flag +-spec(unset_flag(message_flag(), message()) -> message()). +unset_flag(Flag, Msg = #message{flags = Flags}) -> + Msg#message{flags = maps:remove(Flag, Flags)}. --spec(set_flag(atom(), message()) -> message()). -set_flag(dup, Msg = #message{dup = false}) -> - Msg#message{dup = true}; -set_flag(sys, Msg = #message{sys = false}) -> - Msg#message{sys = true}; -set_flag(retain, Msg = #message{retain = false}) -> - Msg#message{retain = true}; -set_flag(Flag, Msg) when Flag =:= dup; - Flag =:= retain; - Flag =:= sys -> Msg. +%% @doc Get header +get_header(Hdr, Msg) -> + get_header(Hdr, Msg, undefined). +get_header(Hdr, #message{headers = Headers}, Default) -> + maps:get(Hdr, Headers, Default). -%% @doc Unset dup, retain flag --spec(unset_flag(message()) -> message()). -unset_flag(Msg) -> - Msg#message{dup = false, retain = false}. +%% @doc Set header +set_header(Hdr, Val, Msg = #message{headers = Headers}) -> + Msg#message{headers = maps:put(Hdr, Val, Headers)}. --spec(unset_flag(dup | retain | atom(), message()) -> message()). -unset_flag(dup, Msg = #message{dup = true}) -> - Msg#message{dup = false}; -unset_flag(retain, Msg = #message{retain = true}) -> - Msg#message{retain = false}; -unset_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg. +%% @doc Get user property +get_user_property(Key, Msg) -> + get_user_property(Key, Msg, undefined). +get_user_property(Key, #message{properties = Props}, Default) -> + maps:get(Key, Props, Default). -%% @doc Format MQTT Message -format(#message{id = MsgId, packet_id = PktId, from = {ClientId, Username}, - qos = Qos, retain = Retain, dup = Dup, topic =Topic}) -> - io_lib:format("Message(Q~p, R~p, D~p, MsgId=~p, PktId=~p, From=~s/~s, Topic=~s)", - [i(Qos), i(Retain), i(Dup), MsgId, PktId, Username, ClientId, Topic]); - -%% TODO:... -format(#message{id = MsgId, packet_id = PktId, from = From, - qos = Qos, retain = Retain, dup = Dup, topic =Topic}) -> - io_lib:format("Message(Q~p, R~p, D~p, MsgId=~p, PktId=~p, From=~s, Topic=~s)", - [i(Qos), i(Retain), i(Dup), MsgId, PktId, From, Topic]). - -i(true) -> 1; -i(false) -> 0; -i(I) when is_integer(I) -> I. +set_user_property(Key, Val, Msg = #message{properties = Props}) -> + Msg#message{properties = maps:put(Key, Val, Props)}. diff --git a/src/emqx_mod_presence.erl b/src/emqx_mod_presence.erl index 559556443..ab9c553b1 100644 --- a/src/emqx_mod_presence.erl +++ b/src/emqx_mod_presence.erl @@ -28,44 +28,55 @@ load(Env) -> emqx:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]), emqx:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]). -on_client_connected(ConnAck, Client = #mqtt_client{client_id = ClientId, - username = Username, - peername = {IpAddr, _}, - clean_sess = CleanSess, - proto_ver = ProtoVer}, Env) -> - Payload = mochijson2:encode([{clientid, ClientId}, +on_client_connected(ConnAck, Client = #client{id = ClientId, + username = Username, + peername = {IpAddr, _} + %%clean_sess = CleanSess, + %%proto_ver = ProtoVer + }, Env) -> + case catch emqx_json:encode([{clientid, ClientId}, {username, Username}, {ipaddress, iolist_to_binary(emqx_net:ntoa(IpAddr))}, - {clean_sess, CleanSess}, - {protocol, ProtoVer}, + %%{clean_sess, CleanSess}, %%TODO:: fixme later + %%{protocol, ProtoVer}, {connack, ConnAck}, - {ts, emqx_time:now_secs()}]), - Msg = message(qos(Env), topic(connected, ClientId), Payload), - emqx:publish(emqx_message:set_flag(sys, Msg)), + {ts, emqx_time:now_secs()}]) of + Payload when is_binary(Payload) -> + Msg = message(qos(Env), topic(connected, ClientId), Payload), + emqx:publish(emqx_message:set_flag(sys, Msg)); + {'EXIT', Reason} -> + emqx_log:error("[Presence Module] json error: ~p", [Reason]) + end, {ok, Client}. -on_client_disconnected(Reason, #mqtt_client{client_id = ClientId, - username = Username}, Env) -> - Payload = mochijson2:encode([{clientid, ClientId}, +on_client_disconnected(Reason, #client{id = ClientId, + username = Username}, Env) -> + case catch emqx_json:encode([{clientid, ClientId}, {username, Username}, {reason, reason(Reason)}, - {ts, emqx_time:now_secs()}]), - Msg = message(qos(Env), topic(disconnected, ClientId), Payload), - emqx:publish(emqx_message:set_flag(sys, Msg)), ok. + {ts, emqx_time:now_secs()}]) of + Payload when is_binary(Payload) -> + Msg = message(qos(Env), topic(disconnected, ClientId), Payload), + emqx:publish(emqx_message:set_flag(sys, Msg)); + {'EXIT', Reason} -> + emqx_log:error("[Presence Module] json error: ~p", [Reason]) + end, ok. unload(_Env) -> emqx:unhook('client.connected', fun ?MODULE:on_client_connected/3), emqx:unhook('client.disconnected', fun ?MODULE:on_client_disconnected/3). message(Qos, Topic, Payload) -> - emqx_message:make(presence, Qos, Topic, iolist_to_binary(Payload)). + Msg = emqx_message:make(presence, Topic, iolist_to_binary(Payload)), + emqx_message:set_header(qos, Qos, Msg). topic(connected, ClientId) -> emqx_topic:systop(list_to_binary(["clients/", ClientId, "/connected"])); topic(disconnected, ClientId) -> emqx_topic:systop(list_to_binary(["clients/", ClientId, "/disconnected"])). -qos(Env) -> proplists:get_value(qos, Env, 0). +qos(Env) -> + proplists:get_value(qos, Env, 0). reason(Reason) when is_atom(Reason) -> Reason; reason({Error, _}) when is_atom(Error) -> Error; diff --git a/src/emqx_mod_subscription.erl b/src/emqx_mod_subscription.erl index b6e3726e5..d37a1bec4 100644 --- a/src/emqx_mod_subscription.erl +++ b/src/emqx_mod_subscription.erl @@ -33,9 +33,9 @@ load(Topics) -> emqx:hook('client.connected', fun ?MODULE:on_client_connected/3, [Topics]). -on_client_connected(?CONNACK_ACCEPT, Client = #mqtt_client{client_id = ClientId, - client_pid = ClientPid, - username = Username}, Topics) -> +on_client_connected(?CONNACK_ACCEPT, Client = #client{id = ClientId, + pid = ClientPid, + username = Username}, Topics) -> Replace = fun(Topic) -> rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic)) end, TopicTable = [{Replace(Topic), Qos} || {Topic, Qos} <- Topics], @@ -49,7 +49,7 @@ unload(_) -> emqx:unhook('client.connected', fun ?MODULE:on_client_connected/3). %%-------------------------------------------------------------------- -%% Internal Functions +%% Internal functions %%-------------------------------------------------------------------- rep(<<"%c">>, ClientId, Topic) -> diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index 3762aff7e..acdd52aab 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -20,11 +20,12 @@ -include("emqx_mqtt.hrl"). -%% API -export([protocol_name/1, type_name/1, connack_name/1]). -export([format/1]). +-export([to_message/1, from_message/1]). + %% @doc Protocol name of version -spec(protocol_name(mqtt_vsn()) -> binary()). protocol_name(?MQTT_PROTO_V3) -> <<"MQIsdp">>; @@ -45,6 +46,49 @@ connack_name(?CONNACK_SERVER) -> 'CONNACK_SERVER'; connack_name(?CONNACK_CREDENTIALS) -> 'CONNACK_CREDENTIALS'; connack_name(?CONNACK_AUTH) -> 'CONNACK_AUTH'. +%% @doc From Message to Packet +-spec(from_message(message()) -> mqtt_packet()). +from_message(Msg = #message{qos = Qos, + topic = Topic, + payload = Payload}) -> + Dup = emqx_message:get_flag(dup, Msg, false), + Retain = emqx_message:get_flag(retain, Msg, false), + PacketId = emqx_message:get_header(packet_id, Msg), + #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, + qos = Qos, + retain = Retain, + dup = Dup}, + variable = #mqtt_packet_publish{topic_name = Topic, + packet_id = PacketId}, + payload = Payload}. + +%% @doc Message from Packet +-spec(to_message(mqtt_packet()) -> message()). +to_message(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, + retain = Retain, + qos = Qos, + dup = Dup}, + variable = #mqtt_packet_publish{topic_name = Topic, + packet_id = PacketId, + properties = Props}, + payload = Payload}) -> + Msg = emqx_message:make(undefined, Topic, Payload), + Msg#message{qos = Qos, + flags = #{dup => Dup, retain => Retain}, + headers = #{packet_id => PacketId}, + properties = Props}; +to_message(#mqtt_packet_connect{will_flag = false}) -> + undefined; +to_message(#mqtt_packet_connect{will_retain = Retain, + will_qos = Qos, + will_topic = Topic, + will_props = Props, + will_msg = Payload}) -> + Msg = emqx_message:make(undefined, Topic, Payload), + Msg#message{flags = #{retain => Retain}, + headers = #{qos => Qos}, + properties = Props}. + %% @doc Format packet -spec(format(mqtt_packet()) -> iolist()). format(#mqtt_packet{header = Header, variable = Variable, payload = Payload}) -> diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index b61040443..1623432a0 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -122,17 +122,11 @@ client(#proto_state{client_id = ClientId, WillMsg =:= undefined -> undefined; true -> WillMsg#message.topic end, - #mqtt_client{client_id = ClientId, - client_pid = ClientPid, - username = Username, - peername = Peername, - clean_sess = CleanSess, - proto_ver = ProtoVer, - keepalive = Keepalive, - will_topic = WillTopic, - ws_initial_headers = WsInitialHeaders, - mountpoint = MountPoint, - connected_at = Time}. + #client{id = ClientId, + pid = ClientPid, + username = Username, + peername = Peername, + mountpoint = MountPoint}. session(#proto_state{session = Session}) -> Session. @@ -220,12 +214,14 @@ process(?CONNECT_PACKET(Var), State0) -> %% Start session case emqx_sm:open_session(#{clean_start => CleanSess, - client_id => clientid(State2), - username => Username}) of + client_id => clientid(State2), + username => Username, + client_pid => self()}) of {ok, Session} -> %% TODO:... SP = true, %% TODO:... - %% Register the client - emqx_cm:reg(client(State2)), + %% TODO: Register the client + emqx_cm:reg(clientid(State2)), + %%emqx_cm:reg(client(State2)), %% Start keepalive start_keepalive(KeepAlive, State2), %% Emit Stats @@ -245,7 +241,7 @@ process(?CONNECT_PACKET(Var), State0) -> %% Run hooks emqx_hooks:run('client.connected', [ReturnCode1], client(State3)), %%TODO: Send Connack - %% send(?CONNACK_PACKET(ReturnCode1, sp(SessPresent)), State3), + send(?CONNACK_PACKET(ReturnCode1, sp(SessPresent)), State3), %% stop if authentication failure stop_if_auth_failure(ReturnCode1, State3); @@ -330,8 +326,9 @@ publish(Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId), username = Username, mountpoint = MountPoint, session = Session}) -> - Msg = emqx_message:from_packet(Username, ClientId, Packet), - emqx_session:publish(Session, mount(replvar(MountPoint, State), Msg)); + Msg = emqx_packet:to_message(Packet), + Msg1 = Msg#message{from = #client{id = ClientId, username = Username}}, + emqx_session:publish(Session, mount(replvar(MountPoint, State), Msg1)); publish(Packet = ?PUBLISH_PACKET(?QOS_1, _PacketId), State) -> with_puback(?PUBACK, Packet, State); @@ -344,8 +341,10 @@ with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId), username = Username, mountpoint = MountPoint, session = Session}) -> - Msg = emqx_message:from_packet(Username, ClientId, Packet), - case emqx_session:publish(Session, mount(replvar(MountPoint, State), Msg)) of + %% TODO: ... + Msg = emqx_packet:to_message(Packet), + Msg1 = Msg#message{from = #client{id = ClientId, username = Username}}, + case emqx_session:publish(Session, mount(replvar(MountPoint, State), Msg1)) of ok -> send(?PUBACK_PACKET(Type, PacketId), State); {error, Error} -> @@ -359,7 +358,7 @@ send(Msg, State = #proto_state{client_id = ClientId, is_bridge = IsBridge}) when is_record(Msg, message) -> emqx_hooks:run('message.delivered', [ClientId, Username], Msg), - send(emqx_message:to_packet(unmount(MountPoint, clean_retain(IsBridge, Msg))), State); + send(emqx_packet:from_message(unmount(MountPoint, clean_retain(IsBridge, Msg))), State); send(Packet = ?PACKET(Type), State = #proto_state{sendfun = SendFun, stats_data = Stats}) -> trace(send, Packet, State), @@ -421,7 +420,7 @@ shutdown(Error, State = #proto_state{will_msg = WillMsg}) -> ok. willmsg(Packet, State = #proto_state{mountpoint = MountPoint}) when is_record(Packet, mqtt_packet_connect) -> - case emqx_message:from_packet(Packet) of + case emqx_packet:to_message(Packet) of undefined -> undefined; Msg -> mount(replvar(MountPoint, State), Msg) end. @@ -438,8 +437,8 @@ maybe_set_clientid(State) -> send_willmsg(_Client, undefined) -> ignore; -send_willmsg(#mqtt_client{client_id = ClientId, username = Username}, WillMsg) -> - emqx_broker:publish(WillMsg#message{from = {ClientId, Username}}). +send_willmsg(Client, WillMsg) -> + emqx_broker:publish(WillMsg#message{from = Client}). start_keepalive(0, _State) -> ignore; @@ -507,12 +506,13 @@ validate_packet(_Packet) -> validate_topics(_Type, []) -> {error, empty_topics}; -validate_topics(Type, TopicTable = [{_Topic, _Qos}|_]) +validate_topics(Type, TopicTable = [{_Topic, _SubOpts}|_]) when Type =:= name orelse Type =:= filter -> Valid = fun(Topic, Qos) -> emqx_topic:validate({Type, Topic}) and validate_qos(Qos) end, - case [Topic || {Topic, Qos} <- TopicTable, not Valid(Topic, Qos)] of + case [Topic || {Topic, SubOpts} <- TopicTable, + not Valid(Topic, proplists:get_value(qos, SubOpts))] of [] -> ok; _ -> {error, badtopic} end; @@ -531,9 +531,10 @@ validate_qos(_) -> false. parse_topic_table(TopicTable) -> - lists:map(fun({Topic0, Qos}) -> + lists:map(fun({Topic0, SubOpts}) -> {Topic, Opts} = emqx_topic:parse(Topic0), - {Topic, [{qos, Qos}|Opts]} + %%TODO: + {Topic, lists:usort(lists:umerge(Opts, SubOpts))} end, TopicTable). parse_topics(Topics) -> @@ -570,10 +571,10 @@ sp(false) -> 0. %% The retained flag should be propagated for bridge. %%-------------------------------------------------------------------- -clean_retain(false, Msg = #message{retain = true, headers = Headers}) -> +clean_retain(false, Msg = #message{flags = #{retain := true}, headers = Headers}) -> case lists:member(retained, Headers) of true -> Msg; - false -> Msg#message{retain = false} + false -> emqx_message:set_flag(retain, false, Msg) end; clean_retain(_IsBridge, Msg) -> Msg. diff --git a/src/emqx_rpc.erl b/src/emqx_rpc.erl index a95dd7323..f76714f76 100644 --- a/src/emqx_rpc.erl +++ b/src/emqx_rpc.erl @@ -16,9 +16,12 @@ -module(emqx_rpc). --export([cast/4]). +-export([call/4, cast/4]). + +call(Node, Mod, Fun, Args) -> + rpc:call(Node, Mod, Fun, Args). cast(Node, Mod, Fun, Args) -> + %%TODO: not right emqx_metrics:inc('messages/forward'), rpc:cast(Node, Mod, Fun, Args). - diff --git a/src/emqx_serializer.erl b/src/emqx_serializer.erl index 63906e426..eb1cec4db 100644 --- a/src/emqx_serializer.erl +++ b/src/emqx_serializer.erl @@ -90,6 +90,7 @@ serialize_variable(?SUBSCRIBE, #mqtt_packet_subscribe{packet_id = PacketId, serialize_variable(?SUBACK, #mqtt_packet_suback{packet_id = PacketId, properties = Properties, reason_codes = ReasonCodes}, undefined) -> + io:format("SubAck ReasonCodes: ~p~n", [ReasonCodes]), {<>, << <> || Code <- ReasonCodes >>}; serialize_variable(?UNSUBSCRIBE, #mqtt_packet_unsubscribe{packet_id = PacketId, diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 1c18907bc..b9c1c87f0 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -29,7 +29,7 @@ -import(proplists, [get_value/2, get_value/3]). %% Session API --export([start_link/3, resume/3, destroy/2]). +-export([start_link/1, resume/3, discard/2]). %% Management and Monitor API -export([state/1, info/1, stats/1]). @@ -42,9 +42,6 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -%% TODO: gen_server Message Priorities --export([handle_pre_hibernate/1]). - -define(MQueue, emqx_mqueue). %% A stateful interaction between a Client and a Server. Some Sessions @@ -71,8 +68,8 @@ %% will be deleted. -record(state, { - %% Clean Session Flag - clean_sess = false :: boolean(), + %% Clean Start Flag + clean_start = false :: boolean(), %% Client Binding: local | remote binding = local :: local | remote, @@ -150,9 +147,9 @@ -define(TIMEOUT, 60000). --define(INFO_KEYS, [clean_sess, client_id, username, client_pid, binding, created_at]). +-define(INFO_KEYS, [clean_start, client_id, username, client_pid, binding, created_at]). --define(STATE_KEYS, [clean_sess, client_id, username, binding, client_pid, old_client_pid, +-define(STATE_KEYS, [clean_start, client_id, username, binding, client_pid, old_client_pid, next_msg_id, max_subscriptions, subscriptions, upgrade_qos, inflight, max_inflight, retry_interval, mqueue, awaiting_rel, max_awaiting_rel, await_rel_timeout, expiry_interval, enable_stats, force_gc_count, @@ -163,10 +160,9 @@ "Session(~s): " ++ Format, [State#state.client_id | Args])). %% @doc Start a Session --spec(start_link(boolean(), {mqtt_client_id(), mqtt_username()}, pid()) -> {ok, pid()} | {error, term()}). -start_link(CleanSess, {ClientId, Username}, ClientPid) -> - gen_server:start_link(?MODULE, [CleanSess, {ClientId, Username}, ClientPid], - [{hibernate_after, 10000}]). +-spec(start_link(map()) -> {ok, pid()} | {error, term()}). +start_link(ClientAttrs) -> + gen_server:start_link(?MODULE, ClientAttrs, [{hibernate_after, 10000}]). %%-------------------------------------------------------------------- %% PubSub API @@ -215,12 +211,12 @@ pubcomp(Session, PacketId) -> gen_server:cast(Session, {pubcomp, PacketId}). %% @doc Unsubscribe the topics --spec(unsubscribe(pid(), [{binary(), [emqx_topic:option()]}]) -> ok). +-spec(unsubscribe(pid(), [{binary(), [suboption()]}]) -> ok). unsubscribe(Session, TopicTable) -> gen_server:cast(Session, {unsubscribe, self(), TopicTable}). %% @doc Resume the session --spec(resume(pid(), mqtt_client_id(), pid()) -> ok). +-spec(resume(pid(), client_id(), pid()) -> ok). resume(Session, ClientId, ClientPid) -> gen_server:cast(Session, {resume, ClientId, ClientPid}). @@ -260,16 +256,19 @@ stats(#state{max_subscriptions = MaxSubscriptions, {deliver_msg, get(deliver_msg)}, {enqueue_msg, get(enqueue_msg)}]). -%% @doc Destroy the session --spec(destroy(pid(), mqtt_client_id()) -> ok). -destroy(Session, ClientId) -> - gen_server:cast(Session, {destroy, ClientId}). +%% @doc Discard the session +-spec(discard(pid(), client_id()) -> ok). +discard(Session, ClientId) -> + gen_server:cast(Session, {discard, ClientId}). %%-------------------------------------------------------------------- %% gen_server Callbacks %%-------------------------------------------------------------------- -init([CleanSess, {ClientId, Username}, ClientPid]) -> +init(#{clean_start := CleanStart, + client_id := ClientId, + username := Username, + client_pid := ClientPid}) -> process_flag(trap_exit, true), true = link(ClientPid), init_stats([deliver_msg, enqueue_msg]), @@ -280,7 +279,7 @@ init([CleanSess, {ClientId, Username}, ClientPid]) -> ForceGcCount = emqx_gc:conn_max_gc_count(), IgnoreLoopDeliver = get_value(ignore_loop_deliver, Env, false), MQueue = ?MQueue:new(ClientId, QEnv, emqx_alarm:alarm_fun()), - State = #state{clean_sess = CleanSess, + State = #state{clean_start = CleanStart, binding = binding(ClientPid), client_id = ClientId, client_pid = ClientPid, @@ -300,8 +299,9 @@ init([CleanSess, {ClientId, Username}, ClientPid]) -> force_gc_count = ForceGcCount, ignore_loop_deliver = IgnoreLoopDeliver, created_at = os:timestamp()}, - emqx_sm:register_session(ClientId, CleanSess, info(State)), + %%emqx_sm:register_session(ClientId, info(State)), emqx_hooks:run('session.created', [ClientId, Username]), + io:format("Session started: ~p~n", [self()]), {ok, emit_stats(State), hibernate}. init_stats(Keys) -> @@ -313,7 +313,7 @@ binding(ClientPid) -> handle_pre_hibernate(State) -> {hibernate, emqx_gc:reset_conn_gc_count(#state.force_gc_count, emit_stats(State))}. -handle_call({publish, Msg = #message{qos = ?QOS_2, packet_id = PacketId}}, _From, +handle_call({publish, Msg = #message{qos = ?QOS_2, headers = #{packet_id := PacketId}}}, _From, State = #state{awaiting_rel = AwaitingRel, await_rel_timer = Timer, await_rel_timeout = Timeout}) -> @@ -350,6 +350,7 @@ handle_cast({subscribe, From, TopicTable, AckFun}, ?LOG(info, "Subscribe ~p", [TopicTable], State), {GrantedQos, Subscriptions1} = lists:foldl(fun({Topic, Opts}, {QosAcc, SubMap}) -> + io:format("SubOpts: ~p~n", [Opts]), Fastlane = lists:member(fastlane, Opts), NewQos = if Fastlane == true -> ?QOS_0; true -> get_value(qos, Opts) end, SubMap1 = @@ -373,6 +374,7 @@ handle_cast({subscribe, From, TopicTable, AckFun}, end, {[NewQos|QosAcc], SubMap1} end, {[], Subscriptions}, TopicTable), + io:format("GrantedQos: ~p~n", [GrantedQos]), AckFun(lists:reverse(GrantedQos)), {noreply, emit_stats(State#state{subscriptions = Subscriptions1}), hibernate}; @@ -456,7 +458,7 @@ handle_cast({pubcomp, PacketId}, State = #state{inflight = Inflight}) -> handle_cast({resume, ClientId, ClientPid}, State = #state{client_id = ClientId, client_pid = OldClientPid, - clean_sess = CleanSess, + clean_start = CleanStart, retry_timer = RetryTimer, await_rel_timer = AwaitTimer, expiry_timer = ExpireTimer}) -> @@ -477,7 +479,7 @@ handle_cast({resume, ClientId, ClientPid}, State1 = State#state{client_pid = ClientPid, binding = binding(ClientPid), old_client_pid = OldClientPid, - clean_sess = false, + clean_start = false, retry_timer = undefined, awaiting_rel = #{}, await_rel_timer = undefined, @@ -485,22 +487,23 @@ handle_cast({resume, ClientId, ClientPid}, %% Clean Session: true -> false? if - CleanSess =:= true -> - ?LOG(error, "CleanSess changed to false.", [], State1), - emqx_sm:register_session(ClientId, false, info(State1)); - CleanSess =:= false -> + CleanStart =:= true -> + ?LOG(error, "CleanSess changed to false.", [], State1); + %%TODO:: + %%emqx_sm:register_session(ClientId, info(State1)); + CleanStart =:= false -> ok end, %% Replay delivery and Dequeue pending messages {noreply, emit_stats(dequeue(retry_delivery(true, State1)))}; -handle_cast({destroy, ClientId}, +handle_cast({discard, ClientId}, State = #state{client_id = ClientId, client_pid = undefined}) -> ?LOG(warning, "Destroyed", [], State), - shutdown(destroy, State); + shutdown(discard, State); -handle_cast({destroy, ClientId}, +handle_cast({discard, ClientId}, State = #state{client_id = ClientId, client_pid = OldClientPid}) -> ?LOG(warning, "kickout ~p", [OldClientPid], State), shutdown(conflict, State); @@ -533,11 +536,11 @@ handle_info({timeout, _Timer, expired}, State) -> shutdown(expired, State); handle_info({'EXIT', ClientPid, _Reason}, - State = #state{clean_sess = true, client_pid = ClientPid}) -> + State = #state{clean_start= true, client_pid = ClientPid}) -> {stop, normal, State}; handle_info({'EXIT', ClientPid, Reason}, - State = #state{clean_sess = false, + State = #state{clean_start = false, client_pid = ClientPid, expiry_interval = Interval}) -> ?LOG(info, "Client ~p EXIT for ~p", [ClientPid, Reason], State), @@ -604,7 +607,7 @@ retry_delivery(Force, [{Type, Msg, Ts} | Msgs], Now, if Force orelse (Diff >= Interval) -> case {Type, Msg} of - {publish, Msg = #message{packet_id = PacketId}} -> + {publish, Msg = #message{headers = #{packet_id := PacketId}}} -> redeliver(Msg, State), Inflight1 = Inflight:update(PacketId, {publish, Msg, Now}), retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1}); @@ -687,7 +690,7 @@ dispatch(Msg = #message{qos = QoS}, true -> enqueue_msg(Msg, State); false -> - Msg1 = Msg#message{packet_id = MsgId}, + Msg1 = emqx_message:set_header(packet_id, MsgId, Msg), deliver(Msg1, State), await(Msg1, next_msg_id(State)) end. @@ -701,7 +704,7 @@ enqueue_msg(Msg, State = #state{mqueue = Q}) -> %%-------------------------------------------------------------------- redeliver(Msg = #message{qos = QoS}, State) -> - deliver(Msg#message{dup = if QoS =:= ?QOS2 -> false; true -> true end}, State); + deliver(if QoS =:= ?QOS2 -> Msg; true -> emqx_message:set_flag(dup, Msg) end, State); redeliver({pubrel, PacketId}, #state{client_pid = Pid}) -> Pid ! {redeliver, {?PUBREL, PacketId}}. @@ -715,7 +718,7 @@ deliver(Msg, #state{client_pid = Pid, binding = remote}) -> %% Awaiting ACK for QoS1/QoS2 Messages %%-------------------------------------------------------------------- -await(Msg = #message{packet_id = PacketId}, +await(Msg = #message{headers = #{packet_id := PacketId}}, State = #state{inflight = Inflight, retry_timer = RetryTimer, retry_interval = Interval}) -> @@ -797,9 +800,8 @@ tune_qos(Topic, Msg = #message{qos = PubQoS}, %% Reset Dup %%-------------------------------------------------------------------- -reset_dup(Msg = #message{dup = true}) -> - Msg#message{dup = false}; -reset_dup(Msg) -> Msg. +reset_dup(Msg) -> + emqx_message:unset_flag(dup, Msg). %%-------------------------------------------------------------------- %% Next Msg Id diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index a3f9e88cb..d0e974456 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -24,7 +24,7 @@ -export([open_session/1, lookup_session/1, close_session/1]). -export([resume_session/1, discard_session/1]). --export([register_session/1, unregister_session/2]). +-export([register_session/1, unregister_session/1, unregister_session/2]). %% lock_session/1, create_session/1, unlock_session/1, @@ -42,17 +42,19 @@ start_link(StatsFun) -> open_session(Session = #{client_id := ClientId, clean_start := true}) -> with_lock(ClientId, fun() -> - case rpc:multicall(ekka:nodelist(), ?MODULE, discard_session, [ClientId]) of + io:format("Nodelist: ~p~n", [ekka_membership:nodelist()]), + case rpc:multicall(ekka_membership:nodelist(), ?MODULE, discard_session, [ClientId]) of {_Res, []} -> ok; {_Res, BadNodes} -> emqx_log:error("[SM] Bad nodes found when lock a session: ~p", [BadNodes]) end, - {ok, emqx_session_sup:start_session(Session)} + io:format("Begin to start session: ~p~n", [Session]), + emqx_session_sup:start_session(Session) end); open_session(Session = #{client_id := ClientId, clean_start := false}) -> with_lock(ClientId, fun() -> - {ResL, _BadNodes} = emqx_rpc:multicall(ekka:nodelist(), ?MODULE, lookup_session, [ClientId]), + {ResL, _BadNodes} = rpc:multicall(ekka_membership:nodelist(), ?MODULE, lookup_session, [ClientId]), case lists:flatten([Pid || Pid <- ResL, Pid =/= undefined]) of [] -> {ok, emqx_session_sup:start_session(Session)}; @@ -61,7 +63,7 @@ open_session(Session = #{client_id := ClientId, clean_start := false}) -> ok -> {ok, SessPid}; {error, Reason} -> emqx_log:error("[SM] Failed to resume session: ~p, ~p", [Session, Reason]), - {ok, emqx_session_sup:start_session(Session)} + emqx_session_sup:start_session(Session) end end end). @@ -109,6 +111,9 @@ with_lock(ClientId, Fun) -> register_session(ClientId) -> ets:insert(session, {ClientId, self()}). +unregister_session(ClientId) -> + unregister_session(ClientId, self()). + unregister_session(ClientId, Pid) -> case ets:lookup(session, ClientId) of [{_, Pid}] -> diff --git a/src/emqx_sm_locker.erl b/src/emqx_sm_locker.erl index fc61e6b06..150481979 100644 --- a/src/emqx_sm_locker.erl +++ b/src/emqx_sm_locker.erl @@ -24,10 +24,10 @@ %% @doc Lock a clientid -spec(lock(client_id()) -> boolean() | {error, term()}). lock(ClientId) -> - emqx_rpc:call(ekka:leader(), emqx_sm_locker, lock, [ClientId]). + rpc:call(ekka_membership:leader(), emqx_locker, lock, [ClientId]). %% @doc Unlock a clientid -spec(unlock(client_id()) -> ok). unlock(ClientId) -> - emqx_rpc:call(ekka:leader(), emqx_locker, unlock, [ClientId]). + rpc:call(ekka_membership:leader(), emqx_locker, unlock, [ClientId]). diff --git a/src/emqx_sm_sup.erl b/src/emqx_sm_sup.erl index 52138f62a..51a495f58 100644 --- a/src/emqx_sm_sup.erl +++ b/src/emqx_sm_sup.erl @@ -26,20 +26,15 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - %% Create tables - create_tabs(), + lists:foreach(fun create_tab/1, [session, session_stats, session_attrs]), StatsFun = emqx_stats:statsfun('sessions/count', 'sessions/max'), SM = {emqx_sm, {emqx_sm, start_link, [StatsFun]}, permanent, 5000, worker, [emqx_sm]}, - {ok, {{one_for_all, 0, 3600}, [SM]}}. - -create_tabs() -> - lists:foreach(fun create_tab/1, [session, session_stats, session_attrs]). + {ok, {{one_for_all, 10, 3600}, [SM]}}. create_tab(Tab) -> - emqx_tables:create(Tab, [public, ordered_set, named_table, - {write_concurrency, true}]). + emqx_tables:create(Tab, [public, ordered_set, named_table, {write_concurrency, true}]). diff --git a/src/emqx_tracer.erl b/src/emqx_tracer.erl index fc68b3cb5..fa4c25892 100644 --- a/src/emqx_tracer.erl +++ b/src/emqx_tracer.erl @@ -51,7 +51,8 @@ start_link() -> trace(publish, From, _Msg) when is_atom(From) -> %% Dont' trace '$SYS' publish ignore; -trace(publish, {ClientId, Username}, #message{topic = Topic, payload = Payload}) -> +trace(publish, #client{id = ClientId, username = Username}, + #message{topic = Topic, payload = Payload}) -> lager:info([{client, ClientId}, {topic, Topic}], "~s/~s PUBLISH to ~s: ~p", [ClientId, Username, Topic, Payload]); trace(publish, From, #message{topic = Topic, payload = Payload}) @@ -59,7 +60,6 @@ trace(publish, From, #message{topic = Topic, payload = Payload}) lager:info([{client, From}, {topic, Topic}], "~s PUBLISH to ~s: ~p", [From, Topic, Payload]). - %%-------------------------------------------------------------------- %% Start/Stop Trace %%--------------------------------------------------------------------