From 282e341433bff1bcf47d1614b6a3d857b21ded3c Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 21 Mar 2018 16:48:52 +0800 Subject: [PATCH] EMQ X R3.0 - Improve the design of pubsub and router --- include/emqx.hrl | 32 +- src/emqx.erl | 99 +++-- src/emqx_access_control.erl | 2 +- src/emqx_access_rule.erl | 25 +- src/emqx_acl_internal.erl | 2 +- src/emqx_acl_mod.erl | 2 +- src/emqx_alarm.erl | 2 +- src/emqx_app.erl | 2 +- src/emqx_auth_mod.erl | 2 +- src/emqx_base62.erl | 2 +- src/emqx_boot.erl | 2 +- src/emqx_bridge.erl | 5 +- src/emqx_bridge_sup.erl | 2 +- src/emqx_bridge_sup_sup.erl | 2 +- src/emqx_broker.erl | 409 ++++++++++++------ src/emqx_broker_helper.erl | 66 +++ ...mqx_pubsub_sup.erl => emqx_broker_sup.erl} | 88 ++-- src/emqx_cli.erl | 2 +- src/emqx_cm.erl | 2 +- src/emqx_cm_sup.erl | 2 +- src/emqx_config.erl | 2 +- src/emqx_connection.erl | 4 +- src/emqx_ctl.erl | 2 +- src/emqx_gc.erl | 2 +- src/emqx_gen_mod.erl | 2 +- src/emqx_guid.erl | 2 +- src/emqx_hooks.erl | 2 +- src/emqx_inflight.erl | 2 +- src/emqx_json.erl | 2 +- src/emqx_keepalive.erl | 2 +- src/emqx_lager_backend.erl | 2 +- src/emqx_locker.erl | 2 +- src/emqx_log.erl | 51 +++ src/emqx_message.erl | 2 +- src/emqx_metrics.erl | 2 +- src/emqx_misc.erl | 2 +- src/emqx_mod_presence.erl | 2 +- src/emqx_mod_rewrite.erl | 2 +- src/emqx_mod_subscription.erl | 2 +- src/emqx_mod_sup.erl | 2 +- src/emqx_modules.erl | 2 +- src/emqx_mqtt_app.erl | 2 +- src/emqx_mqtt_metrics.erl | 2 +- src/emqx_mqtt_props.erl | 2 +- src/emqx_mqtt_rscode.erl | 2 +- src/emqx_mqueue.erl | 2 +- src/emqx_net.erl | 2 +- src/emqx_packet.erl | 2 +- src/emqx_parser.erl | 2 +- src/emqx_plugins.erl | 2 +- src/emqx_pmon.erl | 2 +- src/emqx_pool_sup.erl | 2 +- src/emqx_pooler.erl | 2 +- src/emqx_protocol.erl | 4 +- src/emqx_pubsub.erl | 254 ----------- src/emqx_router.erl | 333 +++++++------- src/emqx_router_helper.erl | 158 +++++++ src/emqx_router_sup.erl | 21 +- src/emqx_rpc.erl | 2 +- src/emqx_server.erl | 326 -------------- src/emqx_session.erl | 10 +- src/emqx_session_sup.erl | 14 +- src/emqx_shared_pubsub.erl | 171 ++++++++ src/emqx_sm.erl | 53 ++- src/emqx_sm_helper.erl | 2 +- src/emqx_sm_locker.erl | 43 ++ src/emqx_sm_sup.erl | 2 +- src/emqx_stats.erl | 18 +- src/emqx_sup.erl | 10 +- src/emqx_sys.erl | 177 ++++++++ src/emqx_sysmon.erl | 2 +- src/emqx_sysmon_sup.erl | 2 +- src/emqx_time.erl | 2 +- src/emqx_topic.erl | 61 +-- src/emqx_trace.erl | 43 +- src/emqx_trace_sup.erl | 2 +- src/emqx_trie.erl | 32 +- src/emqx_vm.erl | 2 +- src/emqx_ws.erl | 2 +- src/emqx_ws_connection.erl | 4 +- src/emqx_ws_connection_sup.erl | 8 +- 81 files changed, 1457 insertions(+), 1168 deletions(-) create mode 100644 src/emqx_broker_helper.erl rename src/{emqx_pubsub_sup.erl => emqx_broker_sup.erl} (50%) create mode 100644 src/emqx_log.erl delete mode 100644 src/emqx_pubsub.erl create mode 100644 src/emqx_router_helper.erl delete mode 100644 src/emqx_server.erl create mode 100644 src/emqx_shared_pubsub.erl create mode 100644 src/emqx_sm_locker.erl create mode 100644 src/emqx_sys.erl diff --git a/include/emqx.hrl b/include/emqx.hrl index 386136a9c..c1b366b86 100644 --- a/include/emqx.hrl +++ b/include/emqx.hrl @@ -36,13 +36,41 @@ -define(SHARE, <<"$share/">>). %% Shared Topic +%%-------------------------------------------------------------------- +%% Client and Session +%%-------------------------------------------------------------------- + +-type(topic() :: binary()). + +-type(subscriber() :: pid() | binary() | {binary(), pid()}). + +-type(suboption() :: {qos, non_neg_integer()} | {share, {'$queue' | binary()}}). + +-type(client_id() :: binary() | atom()). + +-type(protocol() :: mqtt | 'mqtt-sn' | coap | stomp | atom()). + +-type(client() :: #{zone := atom(), + node := atom(), + clientid := client_id(), + protocol := protocol(), + connector => atom(), + peername => {inet:ip_address(), inet:port_number()}, + username => binary(), + atom() => term()}). + +-type(session() :: #{client_id := client_id(), + clean_start := boolean(), + expiry_interval := non_neg_integer()}). + + %%-------------------------------------------------------------------- %% Message and Delivery %%-------------------------------------------------------------------- -type(message_id() :: binary() | undefined). --type(protocol() :: mqtt | 'mqtt-sn' | coap | stomp | atom()). +%% -type(protocol() :: mqtt | 'mqtt-sn' | coap | stomp | atom()). -type(message_from() :: #{zone := atom(), node := atom(), @@ -148,7 +176,7 @@ %% Route %%-------------------------------------------------------------------- --record(route, { topic :: binary(), node :: node() }). +-record(route, { topic :: binary(), dest :: {binary(), node()} | node() }). -type(route() :: #route{}). diff --git a/src/emqx.erl b/src/emqx.erl index 73a750894..386fc222c 100644 --- a/src/emqx.erl +++ b/src/emqx.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -32,8 +32,11 @@ -export([subscribe/1, subscribe/2, subscribe/3, publish/1, unsubscribe/1, unsubscribe/2]). -%% PubSub Management API --export([setqos/3, topics/0, subscriptions/1, subscribers/1, subscribed/2]). +%% PubSub management API +-export([topics/0, subscriptions/1, subscribers/1, subscribed/2]). + +%% Get/Set suboptions +-export([getopts/2, setopts/3]). %% Hooks API -export([hook/4, hook/3, unhook/2, run_hooks/2, run_hooks/3]). @@ -46,21 +49,13 @@ -type(listener() :: {atom(), esockd:listen_on(), [esockd:option()]}). --type(subid() :: binary()). - --type(subscriber() :: pid() | subid() | {subid(), pid()}). - --type(suboption() :: local | {qos, non_neg_integer()} | {share, {'$queue' | binary()}}). - --export_type([subscriber/0, suboption/0]). - -define(APP, ?MODULE). %%-------------------------------------------------------------------- %% Bootstrap, environment, configuration, is_running... %%-------------------------------------------------------------------- -%% @doc Start emqx application. +%% @doc Start emqx application -spec(start() -> ok | {error, term()}). start() -> application:start(?APP). @@ -68,7 +63,7 @@ start() -> application:start(?APP). -spec(stop() -> ok | {error, term()}). stop() -> application:stop(?APP). -%% @doc Get Environment +%% @doc Get environment -spec(env(Key :: atom()) -> {ok, any()} | undefined). env(Key) -> application:get_env(?APP, Key). @@ -76,7 +71,7 @@ env(Key) -> application:get_env(?APP, Key). -spec(env(Key :: atom(), Default :: any()) -> undefined | any()). env(Key, Default) -> application:get_env(?APP, Key, Default). -%% @doc Is running? +%% @doc Is emqx running? -spec(is_running(node()) -> boolean()). is_running(Node) -> case rpc:call(Node, erlang, whereis, [?APP]) of @@ -110,12 +105,9 @@ start_listener({Proto, ListenOn, Opts}) when Proto == http; Proto == ws -> start_listener({Proto, ListenOn, Opts}) when Proto == https; Proto == wss -> {ok, _} = mochiweb:start_http('mqtt:wss', ListenOn, Opts, {emqx_ws, handle_request, []}). -% start_listener({Proto, ListenOn, Opts}) when Proto == api -> -% {ok, _} = mochiweb:start_http('mqtt:api', ListenOn, Opts, emqx_http:http_handler()). - start_listener(Proto, ListenOn, Opts) -> Env = lists:append(emqx:env(client, []), emqx:env(protocol, [])), - MFArgs = {emqx_client, start_link, [Env]}, + MFArgs = {emqx_connection, start_link, [Env]}, {ok, _} = esockd:open(Proto, ListenOn, merge_sockopts(Opts), MFArgs). listeners() -> @@ -169,58 +161,71 @@ merge_sockopts(Options) -> emqx_misc:merge_opts(Options, [{sockopts, SockOpts}]). %%-------------------------------------------------------------------- -%% PubSub APIs +%% PubSub API %%-------------------------------------------------------------------- -%% @doc Subscribe --spec(subscribe(iodata()) -> ok | {error, term()}). +-spec(subscribe(topic() | string()) -> ok | {error, term()}). subscribe(Topic) -> - emqx_server:subscribe(iolist_to_binary(Topic)). + emqx_broker:subscribe(iolist_to_binary(Topic)). --spec(subscribe(iodata(), subscriber()) -> ok | {error, term()}). +-spec(subscribe(topic() | iodata(), subscriber() | string()) -> ok | {error, term()}). subscribe(Topic, Subscriber) -> - emqx_server:subscribe(iolist_to_binary(Topic), Subscriber). + emqx_broker:subscribe(iolist_to_binary(Topic), list_to_subid(Subscriber)). --spec(subscribe(iodata(), subscriber(), [suboption()]) -> ok | {error, term()}). +-spec(subscribe(topic() | iodata(), subscriber() | string(), [suboption()]) -> ok | {error, term()}). subscribe(Topic, Subscriber, Options) -> - emqx_server:subscribe(iolist_to_binary(Topic), Subscriber, Options). + emqx_broker:subscribe(iolist_to_binary(Topic), list_to_subid(Subscriber), Options). -%% @doc Publish MQTT Message --spec(publish(mqtt_message()) -> {ok, mqtt_delivery()} | ignore). +%% @doc Publish Message +-spec(publish(message()) -> {ok, delivery()} | ignore). publish(Msg) -> - emqx_server:publish(Msg). + emqx_broker:publish(Msg). -%% @doc Unsubscribe --spec(unsubscribe(iodata()) -> ok | {error, term()}). +-spec(unsubscribe(topic() | string()) -> ok | {error, term()}). unsubscribe(Topic) -> - emqx_server:unsubscribe(iolist_to_binary(Topic)). + emqx_broker:unsubscribe(iolist_to_binary(Topic)). --spec(unsubscribe(iodata(), subscriber()) -> ok | {error, term()}). +-spec(unsubscribe(topic() | string(), subscriber() | string()) -> ok | {error, term()}). unsubscribe(Topic, Subscriber) -> - emqx_server:unsubscribe(iolist_to_binary(Topic), Subscriber). + emqx_broker:unsubscribe(iolist_to_binary(Topic), list_to_subid(Subscriber)). %%-------------------------------------------------------------------- -%% PubSub Management API +%% PubSub management API %%-------------------------------------------------------------------- --spec(setqos(binary(), subscriber(), mqtt_qos()) -> ok). -setqos(Topic, Subscriber, Qos) -> - emqx_server:setqos(iolist_to_binary(Topic), Subscriber, Qos). +-spec(getopts(topic() | string(), subscriber()) -> [suboption()]). +getopts(Topic, Subscriber) -> + emqx_broker:getopts(iolist_to_binary(Topic), list_to_subid(Subscriber)). --spec(topics() -> [binary()]). +-spec(setopts(topic() | string(), subscriber(), [suboption()]) -> ok). +setopts(Topic, Subscriber, Options) when is_list(Options) -> + emqx_broker:setopts(iolist_to_binary(Topic), list_to_subid(Subscriber), Options). + +-spec(topics() -> list(topic())). topics() -> emqx_router:topics(). --spec(subscribers(iodata()) -> list(subscriber())). +-spec(subscribers(topic() | string()) -> list(subscriber())). subscribers(Topic) -> - emqx_server:subscribers(iolist_to_binary(Topic)). + emqx_broker:subscribers(iolist_to_binary(Topic)). --spec(subscriptions(subscriber()) -> [{subscriber(), binary(), list(suboption())}]). +-spec(subscriptions(subscriber() | string()) -> [{topic(), list(suboption())}]). subscriptions(Subscriber) -> - emqx_server:subscriptions(Subscriber). + emqx_broker:subscriptions(Subscriber). --spec(subscribed(iodata(), subscriber()) -> boolean()). +-spec(subscribed(topic() | string(), subscriber()) -> boolean()). subscribed(Topic, Subscriber) -> - emqx_server:subscribed(iolist_to_binary(Topic), Subscriber). + emqx_broker:subscribed(iolist_to_binary(Topic), list_to_subid(Subscriber)). + +list_to_subid(SubId) when is_binary(SubId) -> + SubId; +list_to_subid(SubId) when is_list(SubId) -> + iolist_to_binary(SubId); +list_to_subid(SubPid) when is_pid(SubPid) -> + SubPid; +list_to_subid({SubId, SubPid}) when is_binary(SubId), is_pid(SubPid) -> + {SubId, SubPid}; +list_to_subid({SubId, SubPid}) when is_list(SubId), is_pid(SubPid) -> + {iolist_to_binary(SubId), SubPid}. %%-------------------------------------------------------------------- %% Hooks API @@ -257,7 +262,7 @@ shutdown() -> shutdown(normal). shutdown(Reason) -> - lager:error("EMQ shutdown for ~s", [Reason]), + emqx_log:error("EMQ shutdown for ~s", [Reason]), emqx_plugins:unload(), lists:foreach(fun application:stop/1, [emqx, ekka, mochiweb, esockd, gproc]). @@ -268,5 +273,5 @@ reboot() -> %% Debug %%-------------------------------------------------------------------- -dump() -> lists:append([emqx_server:dump(), emqx_router:dump()]). +dump() -> lists:append([emqx_broker:dump(), emqx_router:dump()]). diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index 05d3cbacb..10aa68e91 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_access_rule.erl b/src/emqx_access_rule.erl index 9c1d3f4d1..1d54d0fef 100644 --- a/src/emqx_access_rule.erl +++ b/src/emqx_access_rule.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -25,8 +25,6 @@ -type(access() :: subscribe | publish | pubsub). --type(topic() :: binary()). - -type(rule() :: {allow, all} | {allow, who(), access(), list(topic())} | {deny, all} | @@ -42,7 +40,7 @@ compile({A, all}) when ?ALLOW_DENY(A) -> {A, all}; -compile({A, Who, Access, Topic}) when ?ALLOW_DENY(A) andalso is_binary(Topic) -> +compile({A, Who, Access, Topic}) when ?ALLOW_DENY(A), is_binary(Topic) -> {A, compile(who, Who), Access, [compile(topic, Topic)]}; compile({A, Who, Access, TopicFilters}) when ?ALLOW_DENY(A) -> @@ -70,7 +68,7 @@ compile(topic, {eq, Topic}) -> compile(topic, Topic) -> Words = emqx_topic:words(bin(Topic)), case 'pattern?'(Words) of - true -> {pattern, Words}; + true -> {pattern, Words}; false -> Words end. @@ -83,13 +81,14 @@ bin(L) when is_list(L) -> bin(B) when is_binary(B) -> B. -%% @doc Match Access Rule +%% @doc Match access rule -spec(match(mqtt_client(), topic(), rule()) -> {matched, allow} | {matched, deny} | nomatch). match(_Client, _Topic, {AllowDeny, all}) when (AllowDeny =:= allow) orelse (AllowDeny =:= deny) -> {matched, AllowDeny}; match(Client, Topic, {AllowDeny, Who, _PubSub, TopicFilters}) when (AllowDeny =:= allow) orelse (AllowDeny =:= deny) -> - case match_who(Client, Who) andalso match_topics(Client, Topic, TopicFilters) of + case match_who(Client, Who) + andalso match_topics(Client, Topic, TopicFilters) of true -> {matched, AllowDeny}; false -> nomatch end. @@ -123,15 +122,11 @@ match_topics(_Client, _Topic, []) -> false; match_topics(Client, Topic, [{pattern, PatternFilter}|Filters]) -> TopicFilter = feed_var(Client, PatternFilter), - case match_topic(emqx_topic:words(Topic), TopicFilter) of - true -> true; - false -> match_topics(Client, Topic, Filters) - end; + match_topic(emqx_topic:words(Topic), TopicFilter) + orelse match_topics(Client, Topic, Filters); match_topics(Client, Topic, [TopicFilter|Filters]) -> - case match_topic(emqx_topic:words(Topic), TopicFilter) of - true -> true; - false -> match_topics(Client, Topic, Filters) - end. + match_topic(emqx_topic:words(Topic), TopicFilter) + orelse match_topics(Client, Topic, Filters). match_topic(Topic, {eq, TopicFilter}) -> Topic =:= TopicFilter; diff --git a/src/emqx_acl_internal.erl b/src/emqx_acl_internal.erl index 2cd1e5acb..030af96c3 100644 --- a/src/emqx_acl_internal.erl +++ b/src/emqx_acl_internal.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_acl_mod.erl b/src/emqx_acl_mod.erl index 9da3396b4..40f72f898 100644 --- a/src/emqx_acl_mod.erl +++ b/src/emqx_acl_mod.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_alarm.erl b/src/emqx_alarm.erl index 1688e7bb2..83e957047 100644 --- a/src/emqx_alarm.erl +++ b/src/emqx_alarm.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_app.erl b/src/emqx_app.erl index 48648d64e..00acf0d76 100644 --- a/src/emqx_app.erl +++ b/src/emqx_app.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_auth_mod.erl b/src/emqx_auth_mod.erl index 658eea9de..bc2057a06 100644 --- a/src/emqx_auth_mod.erl +++ b/src/emqx_auth_mod.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_base62.erl b/src/emqx_base62.erl index a3c00a979..a997912eb 100644 --- a/src/emqx_base62.erl +++ b/src/emqx_base62.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_boot.erl b/src/emqx_boot.erl index 8d103f3f0..26348b271 100644 --- a/src/emqx_boot.erl +++ b/src/emqx_boot.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index 2ef5a16a2..098b8a41a 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -70,7 +70,8 @@ init([Pool, Id, Node, Topic, Options]) -> true -> true = erlang:monitor_node(Node, true), Share = iolist_to_binary(["$bridge:", atom_to_list(Node), ":", Topic]), - emqx_server:subscribe(Topic, self(), [local, {share, Share}, {qos, ?QOS_0}]), + %% TODO:: local??? + emqx_broker:subscribe(Topic, self(), [local, {share, Share}, {qos, ?QOS_0}]), State = parse_opts(Options, #state{node = Node, subtopic = Topic}), MQueue = emqx_mqueue:new(qname(Node, Topic), [{max_len, State#state.max_queue_len}], diff --git a/src/emqx_bridge_sup.erl b/src/emqx_bridge_sup.erl index 5e0deed34..125c47124 100644 --- a/src/emqx_bridge_sup.erl +++ b/src/emqx_bridge_sup.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_bridge_sup_sup.erl b/src/emqx_bridge_sup_sup.erl index 762a5e8a4..0f2d85212 100644 --- a/src/emqx_bridge_sup_sup.erl +++ b/src/emqx_bridge_sup_sup.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 36b174404..067e2c6da 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -20,183 +20,328 @@ -include("emqx.hrl"). --include("emqx_internal.hrl"). +-export([start_link/2]). -%% API Function Exports --export([start_link/0]). +-export([subscribe/1, subscribe/2, subscribe/3, unsubscribe/1, unsubscribe/2]). -%% Event API --export([subscribe/1, notify/2]). +-export([publish/1, publish/2]). -%% Broker API --export([version/0, uptime/0, datetime/0, sysdescr/0, info/0]). +-export([subscriptions/1, subscribers/1, subscribed/2]). -%% Tick API --export([start_tick/1, stop_tick/1]). +-export([topics/0]). + +-export([getopts/2, setopts/3]). + +-export([dump/0]). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {started_at, sys_interval, heartbeat, ticker, version, sysdescr}). +-record(state, {pool, id, subids :: map(), submon :: emqx_pmon:pmon()}). --define(APP, emqx). +-define(BROKER, ?MODULE). --define(SERVER, ?MODULE). - --define(BROKER_TAB, mqtt_broker). - -%% $SYS Topics of Broker --define(SYSTOP_BROKERS, [ - version, % Broker version - uptime, % Broker uptime - datetime, % Broker local datetime - sysdescr % Broker description -]). +-define(TIMEOUT, 120000). %%-------------------------------------------------------------------- -%% API +%% Start a broker %%-------------------------------------------------------------------- -%% @doc Start the broker --spec(start_link() -> {ok, pid()} | ignore | {error, term()}). -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +-spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, term()}). +start_link(Pool, Id) -> + gen_server:start_link(?MODULE, [Pool, Id], [{hibernate_after, 1000}]). -%% @doc Subscribe broker event --spec(subscribe(EventType :: any()) -> ok). -subscribe(EventType) -> - gproc:reg({p, l, {broker, EventType}}). - -%% @doc Notify broker event --spec(notify(EventType :: any(), Event :: any()) -> ok). -notify(EventType, Event) -> - gproc:send({p, l, {broker, EventType}}, {notify, EventType, self(), Event}). +%%-------------------------------------------------------------------- +%% Sub/Unsub +%%-------------------------------------------------------------------- -%% @doc Get broker info --spec(info() -> list(tuple())). -info() -> - [{version, version()}, - {sysdescr, sysdescr()}, - {uptime, uptime()}, - {datetime, datetime()}]. +-spec(subscribe(topic()) -> ok | {error, term()}). +subscribe(Topic) when is_binary(Topic) -> + subscribe(Topic, self()). -%% @doc Get broker version --spec(version() -> string()). -version() -> - {ok, Version} = application:get_key(?APP, vsn), Version. +-spec(subscribe(topic(), subscriber()) -> ok | {error, term()}). +subscribe(Topic, Subscriber) when is_binary(Topic) -> + subscribe(Topic, Subscriber, []). -%% @doc Get broker description --spec(sysdescr() -> string()). -sysdescr() -> - {ok, Descr} = application:get_key(?APP, description), Descr. +-spec(subscribe(topic(), subscriber(), [suboption()]) -> ok | {error, term()}). +subscribe(Topic, Subscriber, Options) when is_binary(Topic) -> + subscribe(Topic, Subscriber, Options, ?TIMEOUT). -%% @doc Get broker uptime --spec(uptime() -> string()). -uptime() -> gen_server:call(?SERVER, uptime). +-spec(subscribe(topic(), subscriber(), [suboption()], timeout()) + -> ok | {error, term()}). +subscribe(Topic, Subscriber, Options, Timeout) -> + {Topic1, Options1} = emqx_topic:parse(Topic, Options), + SubReq = {subscribe, Topic1, with_subpid(Subscriber), Options1}, + async_call(pick(Subscriber), SubReq, Timeout). -%% @doc Get broker datetime --spec(datetime() -> string()). -datetime() -> - {{Y, M, D}, {H, MM, S}} = calendar:local_time(), - lists:flatten( - io_lib:format( - "~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])). +-spec(unsubscribe(topic()) -> ok | {error, term()}). +unsubscribe(Topic) when is_binary(Topic) -> + unsubscribe(Topic, self()). -%% @doc Start a tick timer. -start_tick(Msg) -> - start_tick(emqx:env(broker_sys_interval, 60000), Msg). +-spec(unsubscribe(topic(), subscriber()) -> ok | {error, term()}). +unsubscribe(Topic, Subscriber) when is_binary(Topic) -> + unsubscribe(Topic, Subscriber, ?TIMEOUT). -start_tick(0, _Msg) -> - undefined; -start_tick(Interval, Msg) when Interval > 0 -> - {ok, TRef} = timer:send_interval(Interval, Msg), TRef. +-spec(unsubscribe(topic(), subscriber(), timeout()) + -> ok | {error, term()}). +unsubscribe(Topic, Subscriber, Timeout) -> + {Topic1, _} = emqx_topic:parse(Topic), + UnsubReq = {unsubscribe, Topic1, with_subpid(Subscriber)}, + async_call(pick(Subscriber), UnsubReq, Timeout). -%% @doc Stop tick timer -stop_tick(undefined) -> +%%-------------------------------------------------------------------- +%% Publish +%%-------------------------------------------------------------------- + +-spec(publish(message()) -> delivery() | stopped). +publish(Msg = #message{from = From}) -> + emqx_tracer:trace(publish, From, Msg), + case emqx_hooks:run('message.publish', [], Msg) of + {ok, Msg1 = #message{topic = Topic}} -> + publish(Topic, Msg1); + {stop, Msg1} -> + emqx_log:warning("Stop publishing: ~s", [emqx_message:format(Msg1)]), + stopped + end. + +publish(Topic, Msg) -> + route(emqx_router:match_routes(Topic), delivery(Msg)). + +route([], Delivery = #delivery{message = Msg}) -> + emqx_hooks:run('message.dropped', [undefined, Msg]), + dropped(Msg#message.topic), Delivery; + +route([{To, Node}], Delivery) when Node =:= node() -> + dispatch(To, Delivery); + +route([{To, Node}], Delivery = #delivery{flows = Flows}) when is_atom(Node) -> + forward(Node, To, Delivery#delivery{flows = [{route, Node, To}|Flows]}); + +route([{To, Group}], Delivery) when is_binary(Group) -> + emqx_shared_pubsub:dispatch(Group, To, Delivery); + +route(Routes, Delivery) -> + lists:foldl(fun(Route, Acc) -> route([Route], Acc) end, Delivery, Routes). + +%% @doc Forward message to another node. +forward(Node, To, Delivery) -> + case emqx_rpc:call(Node, ?BROKER, dispatch, [To, Delivery]) of + {badrpc, Reason} -> + emqx_log:error("[Broker] Failed to forward msg to ~s: ~s", [Node, Reason]), + Delivery; + Delivery1 -> Delivery1 + end. + +-spec(dispatch(topic(), delivery()) -> delivery()). +dispatch(Topic, Delivery = #delivery{message = Msg, flows = Flows}) -> + case subscribers(Topic) of + [] -> + emqx_hooks:run('message.dropped', [undefined, Msg]), + dropped(Topic), Delivery; + [Sub] -> %% optimize? + dispatch(Sub, Topic, Msg), + Delivery#delivery{flows = [{dispatch, Topic, 1}|Flows]}; + Subscribers -> + Count = lists:foldl(fun(Sub, Acc) -> + dispatch(Sub, Topic, Msg), Acc + 1 + end, 0, Subscribers), + Delivery#delivery{flows = [{dispatch, Topic, Count}|Flows]} + end. + +dispatch(SubPid, Topic, Msg) when is_pid(SubPid) -> + SubPid ! {dispatch, Topic, Msg}; +dispatch({SubId, SubPid}, Topic, Msg) when is_binary(SubId), is_pid(SubPid) -> + SubPid ! {dispatch, Topic, Msg}; +dispatch(SubId, Topic, Msg) when is_binary(SubId) -> + emqx_sm:dispatch(SubId, Topic, Msg); +dispatch({share, _Group, _Sub}, _Topic, _Msg) -> + ignore. + +dropped(<<"$SYS/", _/binary>>) -> ok; -stop_tick(TRef) -> - timer:cancel(TRef). +dropped(_Topic) -> + emqx_metrics:inc('messages/dropped'). + +delivery(Msg) -> + #delivery{message = Msg, flows = []}. + +subscribers(Topic) -> + try ets:lookup_element(subscriber, Topic, 2) catch error:badarg -> [] end. + +subscriptions(Subscriber) -> + lists:map(fun({_, {share, _Group, Topic}}) -> + subscription(Topic, Subscriber); + ({_, Topic}) -> + subscription(Topic, Subscriber) + end, ets:lookup(subscription, Subscriber)). + +subscription(Topic, Subscriber) -> + {Topic, ets:lookup_element(suboption, {Topic, Subscriber}, 2)}. + +-spec(subscribed(topic(), subscriber()) -> boolean()). +subscribed(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) -> + ets:member(suboption, {Topic, SubPid}); +subscribed(Topic, SubId) when is_binary(Topic), is_binary(SubId) -> + length(ets:match_object(suboption, {{Topic, {SubId, '_'}}, '_'}, 1)) == 1; +subscribed(Topic, {SubId, SubPid}) when is_binary(Topic), is_binary(SubId), is_pid(SubPid) -> + ets:member(suboption, {Topic, {SubId, SubPid}}). + +topics() -> emqx_router:topics(). + +getopts(Topic, Subscriber) when is_binary(Topic) -> + try ets:lookup_element(suboption, {Topic, Subscriber}, 2) catch error:badarg ->[] end. + +setopts(Topic, Subscriber, Opts) when is_binary(Topic), is_list(Opts) -> + gen_server:call(pick(Subscriber), {setopts, Topic, Subscriber, Opts}). + +with_subpid(SubPid) when is_pid(SubPid) -> + SubPid; +with_subpid(SubId) when is_binary(SubId) -> + {SubId, self()}; +with_subpid({SubId, SubPid}) when is_binary(SubId), is_pid(SubPid) -> + {SubId, SubPid}. + +async_call(Broker, Msg, Timeout) -> + From = {self(), Tag = make_ref()}, + ok = gen_server:cast(Broker, {From, Msg}), + receive + {Tag, Reply} -> Reply + after Timeout -> + {error, timeout} + end. + +pick(SubPid) when is_pid(SubPid) -> + gproc_pool:pick_worker(broker, SubPid); +pick(SubId) when is_binary(SubId) -> + gproc_pool:pick_worker(broker, SubId); +pick({SubId, SubPid}) when is_binary(SubId), is_pid(SubPid) -> + pick(SubId). + +dump() -> + [{Tab, ets:tab2list(Tab)} || Tab <- [subscription, subscriber, suboption]]. %%-------------------------------------------------------------------- -%% gen_server Callbacks +%% gen_server callbacks %%-------------------------------------------------------------------- -init([]) -> - emqx_time:seed(), - ets:new(?BROKER_TAB, [set, public, named_table]), - % Tick - {ok, #state{started_at = os:timestamp(), - heartbeat = start_tick(1000, heartbeat), - version = list_to_binary(version()), - sysdescr = list_to_binary(sysdescr()), - ticker = start_tick(tick)}, hibernate}. +init([Pool, Id]) -> + gproc_pool:connect_worker(Pool, {Pool, Id}), + {ok, #state{pool = Pool, id = Id, subids = #{}, submon = emqx_pmon:new()}}. -handle_call(uptime, _From, State) -> - {reply, uptime(State), State}; +handle_call({setopts, Topic, Subscriber, Opts}, _From, State) -> + case ets:lookup(suboption, {Topic, Subscriber}) of + [{_, OldOpts}] -> + Opts1 = lists:usort(lists:umerge(Opts, OldOpts)), + ets:insert(suboption, {{Topic, Subscriber}, Opts1}), + {reply, ok, State}; + [] -> + {reply, {error, not_found}, State} + end; -handle_call(Req, _From, State) -> - ?UNEXPECTED_REQ(Req, State). +handle_call(Request, _From, State) -> + emqx_log:error("[Broker] Unexpected request: ~p", [Request]), + {reply, ignore, State}. + +handle_cast({From, {subscribe, Topic, Subscriber, Options}}, State) -> + case ets:lookup(suboption, {Topic, Subscriber}) of + [] -> + Group = proplists:get_value(share, Options), + true = do_subscribe(Group, Topic, Subscriber, Options), + emqx_shared_pubsub:subscribe(Group, Topic, subpid(Subscriber)), + emqx_router:add_route(From, Topic, dest(Options)), + {noreply, monitor_subscriber(Subscriber, State)}; + [_] -> + gen_server:reply(From, ok), + {noreply, State} + end; + +handle_cast({From, {unsubscribe, Topic, Subscriber}}, State) -> + case ets:lookup(suboption, {Topic, Subscriber}) of + [{_, Options}] -> + Group = proplists:get_value(share, Options), + true = do_unsubscribe(Group, Topic, Subscriber), + emqx_shared_pubsub:unsubscribe(Group, Topic, subpid(Subscriber)), + case ets:member(subscriber, Topic) of + false -> emqx_router:del_route(From, Topic, dest(Options)); + true -> gen_server:reply(From, ok) + end; + [] -> gen_server:reply(From, ok) + end, + {noreply, State}; handle_cast(Msg, State) -> - ?UNEXPECTED_MSG(Msg, State). + emqx_log:error("[Broker] Unexpected msg: ~p", [Msg]), + {noreply, State}. -handle_info(heartbeat, State) -> - publish(uptime, list_to_binary(uptime(State))), - publish(datetime, list_to_binary(datetime())), - {noreply, State, hibernate}; - -handle_info(tick, State = #state{version = Version, sysdescr = Descr}) -> - retain(brokers), - retain(version, Version), - retain(sysdescr, Descr), - {noreply, State, hibernate}; +handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #state{subids = SubIds}) -> + Subscriber = case maps:find(SubPid, SubIds) of + {ok, SubId} -> {SubId, SubPid}; + error -> SubPid + end, + Topics = lists:map(fun({_, {share, _, Topic}}) -> + Topic; + ({_, Topic}) -> + Topic + end, ets:lookup(subscription, Subscriber)), + lists:foreach(fun(Topic) -> + case ets:lookup(suboption, {Topic, Subscriber}) of + [{_, Options}] -> + Group = proplists:get_value(share, Options), + true = do_unsubscribe(Group, Topic, Subscriber), + case ets:member(subscriber, Topic) of + false -> emqx_router:del_route(Topic, dest(Options)); + true -> ok + end; + [] -> ok + end + end, Topics), + {noreply, demonitor_subscriber(SubPid, State)}; handle_info(Info, State) -> - ?UNEXPECTED_INFO(Info, State). + emqx_log:error("[Broker] Unexpected info: ~p", [Info]), + {noreply, State}. -terminate(_Reason, #state{heartbeat = Hb, ticker = TRef}) -> - stop_tick(Hb), - stop_tick(TRef), - ok. +terminate(_Reason, #state{pool = Pool, id = Id}) -> + gproc_pool:disconnect_worker(Pool, {Pool, Id}). code_change(_OldVsn, State, _Extra) -> {ok, State}. %%-------------------------------------------------------------------- -%% Internal functions +%% Internal Functions %%-------------------------------------------------------------------- -retain(brokers) -> - Payload = list_to_binary(string:join([atom_to_list(N) || - N <- ekka_mnesia:running_nodes()], ",")), - Msg = emqx_message:make(broker, <<"$SYS/brokers">>, Payload), - emqx:publish(emqx_message:set_flag(sys, emqx_message:set_flag(retain, Msg))). +do_subscribe(Group, Topic, Subscriber, Options) -> + ets:insert(subscription, {Subscriber, shared(Group, Topic)}), + ets:insert(subscriber, {Topic, shared(Group, Subscriber)}), + ets:insert(suboption, {{Topic, Subscriber}, Options}). -retain(Topic, Payload) when is_binary(Payload) -> - Msg = emqx_message:make(broker, emqx_topic:systop(Topic), Payload), - emqx:publish(emqx_message:set_flag(sys, emqx_message:set_flag(retain, Msg))). +do_unsubscribe(Group, Topic, Subscriber) -> + ets:delete_object(subscription, {Subscriber, shared(Group, Topic)}), + ets:delete_object(subscriber, {Topic, shared(Group, Subscriber)}), + ets:delete(suboption, {Topic, Subscriber}). -publish(Topic, Payload) when is_binary(Payload) -> - Msg = emqx_message:make(broker, emqx_topic:systop(Topic), Payload), - emqx:publish(emqx_message:set_flag(sys, Msg)). +monitor_subscriber(SubPid, State = #state{submon = SubMon}) when is_pid(SubPid) -> + State#state{submon = SubMon:monitor(SubPid)}; -uptime(#state{started_at = Ts}) -> - Secs = timer:now_diff(os:timestamp(), Ts) div 1000000, - lists:flatten(uptime(seconds, Secs)). +monitor_subscriber({SubId, SubPid}, State = #state{subids = SubIds, submon = SubMon}) -> + State#state{subids = maps:put(SubPid, SubId, SubIds), submon = SubMon:monitor(SubPid)}. -uptime(seconds, Secs) when Secs < 60 -> - [integer_to_list(Secs), " seconds"]; -uptime(seconds, Secs) -> - [uptime(minutes, Secs div 60), integer_to_list(Secs rem 60), " seconds"]; -uptime(minutes, M) when M < 60 -> - [integer_to_list(M), " minutes, "]; -uptime(minutes, M) -> - [uptime(hours, M div 60), integer_to_list(M rem 60), " minutes, "]; -uptime(hours, H) when H < 24 -> - [integer_to_list(H), " hours, "]; -uptime(hours, H) -> - [uptime(days, H div 24), integer_to_list(H rem 24), " hours, "]; -uptime(days, D) -> - [integer_to_list(D), " days,"]. +demonitor_subscriber(SubPid, State = #state{subids = SubIds, submon = SubMon}) -> + State#state{subids = maps:remove(SubPid, SubIds), submon = SubMon:demonitor(SubPid)}. + +dest(Options) -> + case proplists:get_value(share, Options) of + undefined -> node(); + Group -> {Group, node()} + end. + +subpid(SubPid) when is_pid(SubPid) -> + SubPid; +subpid({_SubId, SubPid}) when is_pid(SubPid) -> + SubPid. + +shared(undefined, Name) -> Name; +shared(Group, Name) -> {share, Group, Name}. diff --git a/src/emqx_broker_helper.erl b/src/emqx_broker_helper.erl new file mode 100644 index 000000000..ec40e6cff --- /dev/null +++ b/src/emqx_broker_helper.erl @@ -0,0 +1,66 @@ +%%-------------------------------------------------------------------- +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_broker_helper). + +-behaviour(gen_server). + +-export([start_link/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, {stats_fun, stats_timer}). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +-spec(start_link(fun()) -> {ok, pid()} | ignore | {error, any()}). +start_link(StatsFun) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [StatsFun], []). + +%%-------------------------------------------------------------------- +%% gen_server callbacks +%%-------------------------------------------------------------------- + +init([StatsFun]) -> + {ok, TRef} = timer:send_interval(1000, stats), + {ok, #state{stats_fun = StatsFun, stats_timer = TRef}}. + +handle_call(Req, _From, State) -> + emqx_log:error("[BrokerHelper] Unexpected request: ~p", [Req]), + {reply, ignore, State}. + +handle_cast(Msg, State) -> + emqx_log:error("[BrokerHelper] Unexpected msg: ~p", [Msg]), + {noreply, State}. + +handle_info(stats, State = #state{stats_fun = StatsFun}) -> + StatsFun(), {noreply, State, hibernate}; + +handle_info(Info, State) -> + emqx_log:error("[BrokerHelper] Unexpected info: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, #state{stats_timer = TRef}) -> + timer:cancel(TRef). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + diff --git a/src/emqx_pubsub_sup.erl b/src/emqx_broker_sup.erl similarity index 50% rename from src/emqx_pubsub_sup.erl rename to src/emqx_broker_sup.erl index a9978d011..96c004b49 100644 --- a/src/emqx_pubsub_sup.erl +++ b/src/emqx_broker_sup.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -14,70 +14,80 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_pubsub_sup). +-module(emqx_broker_sup). -behaviour(supervisor). -%% API --export([start_link/0, pubsub_pool/0]). +-export([start_link/0]). -%% Supervisor callbacks -export([init/1]). -define(CONCURRENCY_OPTS, [{read_concurrency, true}, {write_concurrency, true}]). -%%-------------------------------------------------------------------- -%% API -%%-------------------------------------------------------------------- - start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). -pubsub_pool() -> - hd([Pid || {pubsub_pool, Pid, _, _} <- supervisor:which_children(?MODULE)]). - %%-------------------------------------------------------------------- -%% Supervisor Callbacks +%% Supervisor callbacks %%-------------------------------------------------------------------- init([]) -> - {ok, Env} = emqx:env(pubsub), - %% Create ETS Tables - [create_tab(Tab) || Tab <- [mqtt_subproperty, mqtt_subscriber, mqtt_subscription]], - {ok, { {one_for_all, 10, 3600}, [pool_sup(pubsub, Env), pool_sup(server, Env)]} }. + %% Create the pubsub tables + create_tabs(), + + %% Shared pubsub + Shared = {shared_pubsub, {emqx_shared_pubsub, start_link, []}, + permanent, 5000, worker, [emqx_shared_pubsub]}, + + %% Broker helper + Helper = {broker_helper, {emqx_broker_helper, start_link, [stats_fun()]}, + permanent, 5000, worker, [emqx_broker_helper]}, + + %% Broker pool + PoolArgs = [broker, hash, emqx_sys:schedulers() * 2, + {emqx_broker, start_link, []}], + + PoolSup = emqx_pool_sup:spec(broker_pool, PoolArgs), + + {ok, {{one_for_all, 0, 3600}, [Shared, Helper, PoolSup]}}. %%-------------------------------------------------------------------- -%% Pool +%% Create tables %%-------------------------------------------------------------------- -pool_size(Env) -> - Schedulers = erlang:system_info(schedulers), - proplists:get_value(pool_size, Env, Schedulers). +create_tabs() -> + lists:foreach(fun create_tab/1, [subscription, subscriber, suboption]). -pool_sup(Name, Env) -> - Pool = list_to_atom(atom_to_list(Name) ++ "_pool"), - Mod = list_to_atom("emqx_" ++ atom_to_list(Name)), - MFA = {Mod, start_link, [Env]}, - emqx_pool_sup:spec(Pool, [Name, hash, pool_size(Env), MFA]). +create_tab(suboption) -> + %% Suboption: {Topic, Sub} -> [{qos, 1}] + ensure_tab(suboption, [set | ?CONCURRENCY_OPTS]); -%%-------------------------------------------------------------------- -%% Create PubSub Tables -%%-------------------------------------------------------------------- - -create_tab(mqtt_subproperty) -> - %% Subproperty: {Topic, Sub} -> [{qos, 1}] - ensure_tab(mqtt_subproperty, [public, named_table, set | ?CONCURRENCY_OPTS]); - -create_tab(mqtt_subscriber) -> +create_tab(subscriber) -> %% Subscriber: Topic -> Sub1, Sub2, Sub3, ..., SubN %% duplicate_bag: o(1) insert - ensure_tab(mqtt_subscriber, [public, named_table, duplicate_bag | ?CONCURRENCY_OPTS]); + ensure_tab(subscriber, [duplicate_bag | ?CONCURRENCY_OPTS]); -create_tab(mqtt_subscription) -> +create_tab(subscription) -> %% Subscription: Sub -> Topic1, Topic2, Topic3, ..., TopicN %% bag: o(n) insert - ensure_tab(mqtt_subscription, [public, named_table, bag | ?CONCURRENCY_OPTS]). + ensure_tab(subscription, [bag | ?CONCURRENCY_OPTS]). ensure_tab(Tab, Opts) -> - case ets:info(Tab, name) of undefined -> ets:new(Tab, Opts); _ -> ok end. + case ets:info(Tab, name) of + undefined -> + ets:new(Tab, lists:usort([public, named_table | Opts])); + Tab -> Tab + end. + +%%-------------------------------------------------------------------- +%% Stats function +%%-------------------------------------------------------------------- + +stats_fun() -> + fun() -> + emqx_stats:setstat('subscribers/count', 'subscribers/max', + ets:info(subscriber, size)), + emqx_stats:setstat('subscriptions/count', 'subscriptions/max', + ets:info(subscription, size)) + end. diff --git a/src/emqx_cli.erl b/src/emqx_cli.erl index 97a5ca298..8ba645122 100644 --- a/src/emqx_cli.erl +++ b/src/emqx_cli.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 2c186571b..c929ecb1f 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_cm_sup.erl b/src/emqx_cm_sup.erl index bbaf0d855..b73b2f5f5 100644 --- a/src/emqx_cm_sup.erl +++ b/src/emqx_cm_sup.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_config.erl b/src/emqx_config.erl index c9d71feef..15a3a014a 100644 --- a/src/emqx_config.erl +++ b/src/emqx_config.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index bb80cb0fa..2b2f36dee 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_conn). +-module(emqx_connection). -behaviour(gen_server). diff --git a/src/emqx_ctl.erl b/src/emqx_ctl.erl index ee1616b52..b39d4ea83 100644 --- a/src/emqx_ctl.erl +++ b/src/emqx_ctl.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_gc.erl b/src/emqx_gc.erl index 59d7ff8cc..2bc9f75c9 100644 --- a/src/emqx_gc.erl +++ b/src/emqx_gc.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_gen_mod.erl b/src/emqx_gen_mod.erl index 9b3e0ee3c..54996dead 100644 --- a/src/emqx_gen_mod.erl +++ b/src/emqx_gen_mod.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_guid.erl b/src/emqx_guid.erl index e03baab4d..543233887 100644 --- a/src/emqx_guid.erl +++ b/src/emqx_guid.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_hooks.erl b/src/emqx_hooks.erl index 530bf0ad3..369df3f41 100644 --- a/src/emqx_hooks.erl +++ b/src/emqx_hooks.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_inflight.erl b/src/emqx_inflight.erl index b3ea24841..718e1dc79 100644 --- a/src/emqx_inflight.erl +++ b/src/emqx_inflight.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_json.erl b/src/emqx_json.erl index 6b204474f..b3f426113 100644 --- a/src/emqx_json.erl +++ b/src/emqx_json.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_keepalive.erl b/src/emqx_keepalive.erl index 7cfc4f48d..45158d709 100644 --- a/src/emqx_keepalive.erl +++ b/src/emqx_keepalive.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_lager_backend.erl b/src/emqx_lager_backend.erl index c675481c5..735b71467 100644 --- a/src/emqx_lager_backend.erl +++ b/src/emqx_lager_backend.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_locker.erl b/src/emqx_locker.erl index 8d85b0c89..196a3aa69 100644 --- a/src/emqx_locker.erl +++ b/src/emqx_locker.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_log.erl b/src/emqx_log.erl new file mode 100644 index 000000000..50f85c562 --- /dev/null +++ b/src/emqx_log.erl @@ -0,0 +1,51 @@ +%%-------------------------------------------------------------------- +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_log). + +-compile({no_auto_import,[error/1]}). + +-export([debug/1, debug/2, + info/1, info/2, + warning/1, warning/2, + error/1, error/2, + critical/1, critical/2]). + +debug(Msg) -> + lager:debug(Msg). +debug(Format, Args) -> + lager:debug(Format, Args). + +info(Msg) -> + lager:info(Msg). +info(Format, Args) -> + lager:info(Format, Args). + +warning(Msg) -> + lager:warning(Msg). +warning(Format, Args) -> + lager:warning(Format, Args). + +error(Msg) -> + lager:error(Msg). +error(Format, Args) -> + lager:error(Format, Args). + +critical(Msg) -> + lager:critical(Msg). +critical(Format, Args) -> + lager:critical(Format, Args). + diff --git a/src/emqx_message.erl b/src/emqx_message.erl index 2574418a9..acbee1ce7 100644 --- a/src/emqx_message.erl +++ b/src/emqx_message.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_metrics.erl b/src/emqx_metrics.erl index cb477aaa9..f74537218 100644 --- a/src/emqx_metrics.erl +++ b/src/emqx_metrics.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index d10eb6415..bccdbbdf5 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_mod_presence.erl b/src/emqx_mod_presence.erl index a0d8228d5..559556443 100644 --- a/src/emqx_mod_presence.erl +++ b/src/emqx_mod_presence.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_mod_rewrite.erl b/src/emqx_mod_rewrite.erl index 81480f212..87e70021e 100644 --- a/src/emqx_mod_rewrite.erl +++ b/src/emqx_mod_rewrite.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_mod_subscription.erl b/src/emqx_mod_subscription.erl index 7230a6dd2..b6e3726e5 100644 --- a/src/emqx_mod_subscription.erl +++ b/src/emqx_mod_subscription.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_mod_sup.erl b/src/emqx_mod_sup.erl index 016708351..f3e24cf86 100644 --- a/src/emqx_mod_sup.erl +++ b/src/emqx_mod_sup.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_modules.erl b/src/emqx_modules.erl index b3b8293f6..4974abf38 100644 --- a/src/emqx_modules.erl +++ b/src/emqx_modules.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_mqtt_app.erl b/src/emqx_mqtt_app.erl index e947855e1..47e1eaba0 100644 --- a/src/emqx_mqtt_app.erl +++ b/src/emqx_mqtt_app.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_mqtt_metrics.erl b/src/emqx_mqtt_metrics.erl index c9be6bdfc..05b9adcff 100644 --- a/src/emqx_mqtt_metrics.erl +++ b/src/emqx_mqtt_metrics.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_mqtt_props.erl b/src/emqx_mqtt_props.erl index 6001b5211..d928b8ad1 100644 --- a/src/emqx_mqtt_props.erl +++ b/src/emqx_mqtt_props.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_mqtt_rscode.erl b/src/emqx_mqtt_rscode.erl index b5658c17f..47450a5d3 100644 --- a/src/emqx_mqtt_rscode.erl +++ b/src/emqx_mqtt_rscode.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_mqueue.erl b/src/emqx_mqueue.erl index c139a6bdd..eb334ed22 100644 --- a/src/emqx_mqueue.erl +++ b/src/emqx_mqueue.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_net.erl b/src/emqx_net.erl index ee186ee5a..45e05deda 100644 --- a/src/emqx_net.erl +++ b/src/emqx_net.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index 39681f27f..3762aff7e 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_parser.erl b/src/emqx_parser.erl index a65b70894..f23b7e727 100644 --- a/src/emqx_parser.erl +++ b/src/emqx_parser.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_plugins.erl b/src/emqx_plugins.erl index 0ec84245f..e9d980ca3 100644 --- a/src/emqx_plugins.erl +++ b/src/emqx_plugins.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_pmon.erl b/src/emqx_pmon.erl index e65f34c6c..cd66414fc 100644 --- a/src/emqx_pmon.erl +++ b/src/emqx_pmon.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_pool_sup.erl b/src/emqx_pool_sup.erl index 45af0b81b..47bb4eaf0 100644 --- a/src/emqx_pool_sup.erl +++ b/src/emqx_pool_sup.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_pooler.erl b/src/emqx_pooler.erl index 71af6b279..943c0642f 100644 --- a/src/emqx_pooler.erl +++ b/src/emqx_pooler.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index c2ab59be3..103fbf4ff 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -436,7 +436,7 @@ maybe_set_clientid(State) -> send_willmsg(_Client, undefined) -> ignore; send_willmsg(#mqtt_client{client_id = ClientId, username = Username}, WillMsg) -> - emqx_server:publish(WillMsg#mqtt_message{from = {ClientId, Username}}). + emqx_broker:publish(WillMsg#mqtt_message{from = {ClientId, Username}}). start_keepalive(0, _State) -> ignore; diff --git a/src/emqx_pubsub.erl b/src/emqx_pubsub.erl deleted file mode 100644 index 9099afc83..000000000 --- a/src/emqx_pubsub.erl +++ /dev/null @@ -1,254 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_pubsub). - --behaviour(gen_server). - --include("emqx.hrl"). - --include("emqx_mqtt.hrl"). - --export([start_link/3]). - -%% PubSub API. --export([subscribe/3, async_subscribe/3, publish/2, unsubscribe/3, - async_unsubscribe/3, subscribers/1]). - --export([dispatch/2]). - -%% gen_server Callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --record(state, {pool, id, env}). - --define(PUBSUB, ?MODULE). - --define(is_local(Options), lists:member(local, Options)). - -%%-------------------------------------------------------------------- -%% Start PubSub -%%-------------------------------------------------------------------- - --spec(start_link(atom(), pos_integer(), list()) -> {ok, pid()} | ignore | {error, term()}). -start_link(Pool, Id, Env) -> - gen_server:start_link(?MODULE, [Pool, Id, Env], [{hibernate_after, 10000}]). - -%%-------------------------------------------------------------------- -%% PubSub API -%%-------------------------------------------------------------------- - -%% @doc Subscribe to a Topic --spec(subscribe(binary(), emqx:subscriber(), [emqx:suboption()]) -> ok). -subscribe(Topic, Subscriber, Options) -> - call(pick(Topic), {subscribe, Topic, Subscriber, Options}). - --spec(async_subscribe(binary(), emqx:subscriber(), [emqx:suboption()]) -> ok). -async_subscribe(Topic, Subscriber, Options) -> - cast(pick(Topic), {subscribe, Topic, Subscriber, Options}). - -%% @doc Publish MQTT Message to a Topic --spec(publish(binary(), mqtt_message()) -> {ok, mqtt_delivery()} | ignore). -publish(Topic, Msg) -> - route(lists:append(emqx_router:match(Topic), - emqx_router:match_local(Topic)), delivery(Msg)). - -route([], #mqtt_delivery{message = Msg}) -> - emqx_hooks:run('message.dropped', [undefined, Msg]), - dropped(Msg#mqtt_message.topic), ignore; - -%% Dispatch on the local node. -route([#route{topic = To, node = Node}], - Delivery = #mqtt_delivery{flows = Flows}) when Node =:= node() -> - dispatch(To, Delivery#mqtt_delivery{flows = [{route, Node, To} | Flows]}); - -%% Forward to other nodes -route([#route{topic = To, node = Node}], Delivery = #mqtt_delivery{flows = Flows}) -> - forward(Node, To, Delivery#mqtt_delivery{flows = [{route, Node, To}|Flows]}); - -route(Routes, Delivery) -> - {ok, lists:foldl(fun(Route, Acc) -> - {ok, Acc1} = route([Route], Acc), Acc1 - end, Delivery, Routes)}. - -delivery(Msg) -> #mqtt_delivery{sender = self(), message = Msg, flows = []}. - -%% @doc Forward message to another node... -forward(Node, To, Delivery) -> - emqx_rpc:cast(Node, ?PUBSUB, dispatch, [To, Delivery]), {ok, Delivery}. - -%% @doc Dispatch Message to Subscribers. --spec(dispatch(binary(), mqtt_delivery()) -> mqtt_delivery()). -dispatch(Topic, Delivery = #mqtt_delivery{message = Msg, flows = Flows}) -> - case subscribers(Topic) of - [] -> - emqx_hooks:run('message.dropped', [undefined, Msg]), - dropped(Topic), {ok, Delivery}; - [Sub] -> %% optimize? - dispatch(Sub, Topic, Msg), - {ok, Delivery#mqtt_delivery{flows = [{dispatch, Topic, 1}|Flows]}}; - Subscribers -> - Flows1 = [{dispatch, Topic, length(Subscribers)} | Flows], - lists:foreach(fun(Sub) -> dispatch(Sub, Topic, Msg) end, Subscribers), - {ok, Delivery#mqtt_delivery{flows = Flows1}} - end. - -%%TODO: Is SubPid aliving??? -dispatch(SubPid, Topic, Msg) when is_pid(SubPid) -> - SubPid ! {dispatch, Topic, Msg}; -dispatch({SubId, SubPid}, Topic, Msg) when is_binary(SubId), is_pid(SubPid) -> - SubPid ! {dispatch, Topic, Msg}; -dispatch({{share, _Share}, [Sub]}, Topic, Msg) -> - dispatch(Sub, Topic, Msg); -dispatch({{share, _Share}, []}, _Topic, _Msg) -> - ok; -dispatch({{share, _Share}, Subs}, Topic, Msg) -> %% round-robbin? - dispatch(lists:nth(rand:uniform(length(Subs)), Subs), Topic, Msg). - -subscribers(Topic) -> - group_by_share(try ets:lookup_element(mqtt_subscriber, Topic, 2) catch error:badarg -> [] end). - -group_by_share([]) -> []; - -group_by_share(Subscribers) -> - {Subs1, Shares1} = - lists:foldl(fun({share, Share, Sub}, {Subs, Shares}) -> - {Subs, dict:append({share, Share}, Sub, Shares)}; - (Sub, {Subs, Shares}) -> - {[Sub|Subs], Shares} - end, {[], dict:new()}, Subscribers), - lists:append(Subs1, dict:to_list(Shares1)). - -%% @private -%% @doc Ingore $SYS Messages. -dropped(<<"$SYS/", _/binary>>) -> - ok; -dropped(_Topic) -> - emqx_metrics:inc('messages/dropped'). - -%% @doc Unsubscribe --spec(unsubscribe(binary(), emqx:subscriber(), [emqx:suboption()]) -> ok). -unsubscribe(Topic, Subscriber, Options) -> - call(pick(Topic), {unsubscribe, Topic, Subscriber, Options}). - --spec(async_unsubscribe(binary(), emqx:subscriber(), [emqx:suboption()]) -> ok). -async_unsubscribe(Topic, Subscriber, Options) -> - cast(pick(Topic), {unsubscribe, Topic, Subscriber, Options}). - -call(PubSub, Req) when is_pid(PubSub) -> - gen_server:call(PubSub, Req, infinity). - -cast(PubSub, Msg) when is_pid(PubSub) -> - gen_server:cast(PubSub, Msg). - -pick(Topic) -> - gproc_pool:pick_worker(pubsub, Topic). - -%%-------------------------------------------------------------------- -%% gen_server Callbacks -%%-------------------------------------------------------------------- - -init([Pool, Id, Env]) -> - gproc_pool:connect_worker(Pool, {Pool, Id}), - {ok, #state{pool = Pool, id = Id, env = Env}, hibernate}. - -handle_call({subscribe, Topic, Subscriber, Options}, _From, State) -> - add_subscriber(Topic, Subscriber, Options), - reply(ok, setstats(State)); - -handle_call({unsubscribe, Topic, Subscriber, Options}, _From, State) -> - del_subscriber(Topic, Subscriber, Options), - reply(ok, setstats(State)); - -handle_call(Req, _From, State) -> - lager:error("[~s] Unexpected Call: ~p", [?MODULE, Req]), - {reply, ignore, State}. - -handle_cast({subscribe, Topic, Subscriber, Options}, State) -> - add_subscriber(Topic, Subscriber, Options), - noreply(setstats(State)); - -handle_cast({unsubscribe, Topic, Subscriber, Options}, State) -> - del_subscriber(Topic, Subscriber, Options), - noreply(setstats(State)); - -handle_cast(Msg, State) -> - lager:error("[~s] Unexpected Cast: ~p", [?MODULE, Msg]), - {noreply, State}. - -handle_info(Info, State) -> - lager:error("[~s] Unexpected Info: ~p", [?MODULE, Info]), - {noreply, State}. - -terminate(_Reason, #state{pool = Pool, id = Id}) -> - gproc_pool:disconnect_worker(Pool, {Pool, Id}). - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- -%% Internel Functions -%%-------------------------------------------------------------------- - -add_subscriber(Topic, Subscriber, Options) -> - Share = proplists:get_value(share, Options), - case ?is_local(Options) of - false -> add_global_subscriber(Share, Topic, Subscriber); - true -> add_local_subscriber(Share, Topic, Subscriber) - end. - -add_global_subscriber(Share, Topic, Subscriber) -> - case ets:member(mqtt_subscriber, Topic) and emqx_router:has_route(Topic) of - true -> ok; - false -> emqx_router:add_route(Topic) - end, - ets:insert(mqtt_subscriber, {Topic, shared(Share, Subscriber)}). - -add_local_subscriber(Share, Topic, Subscriber) -> - (not ets:member(mqtt_subscriber, {local, Topic})) andalso emqx_router:add_local_route(Topic), - ets:insert(mqtt_subscriber, {{local, Topic}, shared(Share, Subscriber)}). - -del_subscriber(Topic, Subscriber, Options) -> - Share = proplists:get_value(share, Options), - case ?is_local(Options) of - false -> del_global_subscriber(Share, Topic, Subscriber); - true -> del_local_subscriber(Share, Topic, Subscriber) - end. - -del_global_subscriber(Share, Topic, Subscriber) -> - ets:delete_object(mqtt_subscriber, {Topic, shared(Share, Subscriber)}), - (not ets:member(mqtt_subscriber, Topic)) andalso emqx_router:del_route(Topic). - -del_local_subscriber(Share, Topic, Subscriber) -> - ets:delete_object(mqtt_subscriber, {{local, Topic}, shared(Share, Subscriber)}), - (not ets:member(mqtt_subscriber, {local, Topic})) andalso emqx_router:del_local_route(Topic). - -shared(undefined, Subscriber) -> - Subscriber; -shared(Share, Subscriber) -> - {share, Share, Subscriber}. - -setstats(State) -> - emqx_stats:setstats('subscribers/count', 'subscribers/max', ets:info(mqtt_subscriber, size)), - State. - -reply(Reply, State) -> - {reply, Reply, State}. - -noreply(State) -> - {noreply, State}. - diff --git a/src/emqx_router.erl b/src/emqx_router.erl index e46add4bf..3903dc005 100644 --- a/src/emqx_router.erl +++ b/src/emqx_router.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -26,35 +26,27 @@ -boot_mnesia({mnesia, [boot]}). -copy_mnesia({mnesia, [copy]}). --export([start_link/1]). - -%% For eunit tests --export([start/0, stop/0]). +%% Start +-export([start_link/2]). %% Topics --export([topics/0, local_topics/0]). +-export([topics/0]). -%% Route APIs --export([add_route/1, get_routes/1, del_route/1, has_route/1]). +%% Route Management APIs +-export([add_route/2, add_route/3, get_routes/1, del_route/2, del_route/3]). -%% Match and print --export([match/1, print/1]). +%% Match, print routes +-export([has_routes/1, match_routes/1, print_routes/1]). -%% Local Route API --export([get_local_routes/0, add_local_route/1, match_local/1, - del_local_route/1, clean_local_routes/0]). +-export([dump/0]). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([dump/0]). +-record(state, {pool, id}). --record(state, {stats_fun, stats_timer}). - --define(ROUTER, ?MODULE). - --define(LOCK, {?ROUTER, clean_routes}). +-type(destination() :: node() | {binary(), node()}). %%-------------------------------------------------------------------- %% Mnesia Bootstrap @@ -68,14 +60,46 @@ mnesia(boot) -> {attributes, record_info(fields, route)}]); mnesia(copy) -> - ok = ekka_mnesia:copy_table(route, ram_copies). + ok = ekka_mnesia:copy_table(route). %%-------------------------------------------------------------------- -%% Start the Router +%% Start a router %%-------------------------------------------------------------------- -start_link(StatsFun) -> - gen_server:start_link({local, ?ROUTER}, ?MODULE, [StatsFun], []). +start_link(Pool, Id) -> + gen_server:start_link(?MODULE, [Pool, Id], [{hibernate_after, 1000}]). + +%%-------------------------------------------------------------------- +%% Add/Del Routes +%%-------------------------------------------------------------------- + +%% @doc Add a route +-spec(add_route(topic(), destination()) -> ok). +add_route(Topic, Dest) when is_binary(Topic) -> + cast(pick(Topic), {add_route, #route{topic = Topic, dest = Dest}}). + +-spec(add_route({pid(), reference()}, topic(), destination()) -> ok). +add_route(From, Topic, Dest) when is_binary(Topic) -> + cast(pick(Topic), {add_route, From, #route{topic = Topic, dest = Dest}}). + +%% @doc Get routes +-spec(get_routes(topic()) -> [route()]). +get_routes(Topic) -> + ets:lookup(route, Topic). + +%% @doc Delete a route +-spec(del_route(topic(), destination()) -> ok). +del_route(Topic, Dest) when is_binary(Topic) -> + cast(pick(Topic), {del_route, #route{topic = Topic, dest = Dest}}). + +-spec(del_route({pid(), reference()}, topic(), destination()) -> ok). +del_route(From, Topic, Dest) when is_binary(Topic) -> + cast(pick(Topic), {del_route, From, #route{topic = Topic, dest = Dest}}). + +%% @doc Has routes? +-spec(has_routes(topic()) -> boolean()). +has_routes(Topic) when is_binary(Topic) -> + ets:member(route, Topic). %%-------------------------------------------------------------------- %% Topics @@ -85,44 +109,116 @@ start_link(StatsFun) -> topics() -> mnesia:dirty_all_keys(route). --spec(local_topics() -> list(binary())). -local_topics() -> - ets:select(local_route, [{{'$1', '_'}, [], ['$1']}]). - %%-------------------------------------------------------------------- -%% Match API +%% Match Routes %%-------------------------------------------------------------------- -%% @doc Match Routes. --spec(match(Topic:: binary()) -> [route()]). -match(Topic) when is_binary(Topic) -> +%% @doc Match routes +-spec(match_routes(Topic:: topic()) -> [{topic(), binary() | node()}]). +match_routes(Topic) when is_binary(Topic) -> %% Optimize: ets??? Matched = mnesia:ets(fun emqx_trie:match/1, [Topic]), %% Optimize: route table will be replicated to all nodes. - lists:append([ets:lookup(route, To) || To <- [Topic | Matched]]). + aggre(lists:append([ets:lookup(route, To) || To <- [Topic | Matched]])). -%% @doc Print Routes. --spec(print(Topic :: binary()) -> [ok]). -print(Topic) -> - [io:format("~s -> ~s~n", [To, Node]) || - #route{topic = To, node = Node} <- match(Topic)]. +%% Aggregate routes +aggre([]) -> + []; +aggre([#route{topic = To, dest = Node}]) when is_atom(Node) -> + [{To, Node}]; +aggre([#route{topic = To, dest = {Group, _Node}}]) -> + [{To, Group}]; +aggre(Routes) -> + lists:foldl( + fun(#route{topic = To, dest = Node}, Acc) when is_atom(Node) -> + [{To, Node} | Acc]; + (#route{topic = To, dest = {Group, _}}, Acc) -> + lists:usort([{To, Group} | Acc]) + end, [], Routes). %%-------------------------------------------------------------------- -%% Route Management API +%% Print Routes %%-------------------------------------------------------------------- -%% @doc Add Route. --spec(add_route(binary() | route()) -> ok | {error, Reason :: term()}). -add_route(Topic) when is_binary(Topic) -> - add_route(#route{topic = Topic, node = node()}); -add_route(Route = #route{topic = Topic}) -> - case emqx_topic:wildcard(Topic) of - true -> case mnesia:is_transaction() of - true -> add_trie_route(Route); - false -> trans(fun add_trie_route/1, [Route]) - end; - false -> add_direct_route(Route) - end. +%% @doc Print routes to a topic +-spec(print_routes(topic()) -> ok). +print_routes(Topic) -> + lists:foreach(fun({To, Dest}) -> + io:format("~s -> ~s~n", [To, Dest]) + end, match_routes(Topic)). + +cast(Router, Msg) -> + gen_server:cast(Router, Msg). + +pick(Topic) -> + gproc_pool:pick_worker(router, Topic). + +%%FIXME: OOM? +dump() -> + [{route, [{To, Dest} || #route{topic = To, dest = Dest} <- ets:tab2list(route)]}]. + +%%-------------------------------------------------------------------- +%% gen_server callbacks +%%-------------------------------------------------------------------- + +init([Pool, Id]) -> + gproc_pool:connect_worker(Pool, {Pool, Id}), + {ok, #state{pool = Pool, id = Id}}. + +handle_call(Req, _From, State) -> + emqx_log:error("[Router] Unexpected request: ~p", [Req]), + {reply, ignore, State}. + +handle_cast({add_route, From, Route}, State) -> + _ = handle_cast({add_route, Route}, State), + gen_server:reply(From, ok), + {noreply, State}; + +handle_cast({add_route, Route = #route{topic = Topic, dest = Dest}}, State) -> + case lists:member(Route, ets:lookup(route, Topic)) of + true -> ok; + false -> + ok = emqx_router_helper:monitor(Dest), + case emqx_topic:wildcard(Topic) of + true -> trans(fun add_trie_route/1, [Route]); + false -> add_direct_route(Route) + end + end, + {noreply, State}; + +handle_cast({del_route, From, Route}, State) -> + _ = handle_cast({del_route, Route}, State), + gen_server:reply(From, ok), + {noreply, State}; + +handle_cast({del_route, Route = #route{topic = Topic}}, State) -> + %% Confirm if there are still subscribers... + case ets:member(subscriber, Topic) of + true -> ok; + false -> + case emqx_topic:wildcard(Topic) of + true -> trans(fun del_trie_route/1, [Route]); + false -> del_direct_route(Route) + end + end, + {noreply, State}; + +handle_cast(Msg, State) -> + emqx_log:error("[Router] Unexpected msg: ~p", [Msg]), + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, #state{pool = Pool, id = Id}) -> + gproc_pool:disconnect_worker(Pool, {Pool, Id}). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Internal Functions +%%-------------------------------------------------------------------- add_direct_route(Route) -> mnesia:async_dirty(fun mnesia:write/1, [Route]). @@ -134,24 +230,6 @@ add_trie_route(Route = #route{topic = Topic}) -> end, mnesia:write(Route). -%% @doc Lookup Routes --spec(get_routes(binary()) -> [route()]). -get_routes(Topic) -> - ets:lookup(route, Topic). - -%% @doc Delete Route --spec(del_route(binary() | route()) -> ok | {error, Reason :: term()}). -del_route(Topic) when is_binary(Topic) -> - del_route(#route{topic = Topic, node = node()}); -del_route(Route = #route{topic = Topic}) -> - case emqx_topic:wildcard(Topic) of - true -> case mnesia:is_transaction() of - true -> del_trie_route(Route); - false -> trans(fun del_trie_route/1, [Route]) - end; - false -> del_direct_route(Route) - end. - del_direct_route(Route) -> mnesia:async_dirty(fun mnesia:delete_object/1, [Route]). @@ -165,126 +243,13 @@ del_trie_route(Route = #route{topic = Topic}) -> [] -> ok end. -%% @doc Has route? --spec(has_route(binary()) -> boolean()). -has_route(Topic) when is_binary(Topic) -> - ets:member(route, Topic). - %% @private -spec(trans(function(), list(any())) -> ok | {error, term()}). trans(Fun, Args) -> case mnesia:transaction(Fun, Args) of {atomic, _} -> ok; - {aborted, Error} -> {error, Error} + {aborted, Error} -> + emqx_log:error("[Router] Mnesia aborted: ~p", [Error]), + {error, Error} end. -%%-------------------------------------------------------------------- -%% Local Route API -%%-------------------------------------------------------------------- - --spec(get_local_routes() -> list({binary(), node()})). -get_local_routes() -> - ets:tab2list(local_route). - --spec(add_local_route(binary()) -> ok). -add_local_route(Topic) -> - gen_server:call(?ROUTER, {add_local_route, Topic}). - --spec(del_local_route(binary()) -> ok). -del_local_route(Topic) -> - gen_server:call(?ROUTER, {del_local_route, Topic}). - --spec(match_local(binary()) -> [route()]). -match_local(Name) -> - case ets:info(local_route, size) of - 0 -> []; - _ -> ets:foldl( - fun({Filter, Node}, Matched) -> - case emqx_topic:match(Name, Filter) of - true -> [#route{topic = {local, Filter}, node = Node} | Matched]; - false -> Matched - end - end, [], local_route) - end. - --spec(clean_local_routes() -> ok). -clean_local_routes() -> - gen_server:call(?ROUTER, clean_local_routes). - -dump() -> - [{route, ets:tab2list(route)}, {local_route, ets:tab2list(local_route)}]. - -%% For unit test. -start() -> - gen_server:start({local, ?ROUTER}, ?MODULE, [], []). - -stop() -> - gen_server:call(?ROUTER, stop). - -%%-------------------------------------------------------------------- -%% gen_server Callbacks -%%-------------------------------------------------------------------- - -init([StatsFun]) -> - ekka:monitor(membership), - ets:new(local_route, [set, named_table, protected]), - {ok, TRef} = timer:send_interval(timer:seconds(1), stats), - {ok, #state{stats_fun = StatsFun, stats_timer = TRef}}. - -handle_call({add_local_route, Topic}, _From, State) -> - %% why node()...? - ets:insert(local_route, {Topic, node()}), - {reply, ok, State}; - -handle_call({del_local_route, Topic}, _From, State) -> - ets:delete(local_route, Topic), - {reply, ok, State}; - -handle_call(clean_local_routes, _From, State) -> - ets:delete_all_objects(local_route), - {reply, ok, State}; - -handle_call(stop, _From, State) -> - {stop, normal, ok, State}; - -handle_call(_Req, _From, State) -> - {reply, ignore, State}. - -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info({membership, {mnesia, down, Node}}, State) -> - global:trans({?LOCK, self()}, fun() -> clean_routes_(Node) end), - handle_info(stats, State); - -handle_info({membership, _Event}, State) -> - %% ignore - {noreply, State}; - -handle_info(stats, State = #state{stats_fun = StatsFun}) -> - StatsFun(mnesia:table_info(route, size)), - {noreply, State, hibernate}; - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, #state{stats_timer = TRef}) -> - timer:cancel(TRef), - ekka:unmonitor(membership). - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- -%% Internal Functions -%%-------------------------------------------------------------------- - -%% Clean routes on the down node. -clean_routes_(Node) -> - Pattern = #route{_ = '_', node = Node}, - Clean = fun() -> - [mnesia:delete_object(route, R, write) || - R <- mnesia:match_object(route, Pattern, write)] - end, - mnesia:transaction(Clean). - diff --git a/src/emqx_router_helper.erl b/src/emqx_router_helper.erl new file mode 100644 index 000000000..97ba1811f --- /dev/null +++ b/src/emqx_router_helper.erl @@ -0,0 +1,158 @@ +%%-------------------------------------------------------------------- +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_router_helper). + +-behaviour(gen_server). + +-include("emqx.hrl"). + +%% Mnesia Bootstrap +-export([mnesia/1]). + +-boot_mnesia({mnesia, [boot]}). +-copy_mnesia({mnesia, [copy]}). + +%% API +-export([start_link/1, monitor/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(routing_node, {name, ts}). + +-record(state, {nodes = [], stats_fun, stats_timer}). + +-compile({no_auto_import, [monitor/1]}). + +-define(SERVER, ?MODULE). + +-define(TABLE, routing_node). + +-define(LOCK, {?MODULE, clean_routes}). + +%%-------------------------------------------------------------------- +%% Mnesia bootstrap +%%-------------------------------------------------------------------- + +mnesia(boot) -> + ok = ekka_mnesia:create_table(?TABLE, [ + {type, set}, + {ram_copies, [node()]}, + {record_name, routing_node}, + {attributes, record_info(fields, routing_node)}]); + +mnesia(copy) -> + ok = ekka_mnesia:copy_table(?TABLE). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +%% @doc Starts the router helper +-spec(start_link(fun()) -> {ok, pid()} | ignore | {error, any()}). +start_link(StatsFun) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [StatsFun], []). + +%% @doc Monitor routing node +-spec(monitor(node()) -> ok). +monitor({_Group, Node}) -> + monitor(Node); +monitor(Node) when is_atom(Node) -> + case ekka:is_member(Node) orelse ets:member(?TABLE, Node) of + true -> ok; + false -> mnesia:dirty_write(#routing_node{name = Node, ts = os:timestamp()}) + end. + +%%-------------------------------------------------------------------- +%% gen_server callbacks +%%-------------------------------------------------------------------- + +init([StatsFun]) -> + ekka:monitor(membership), + mnesia:subscribe({table, ?TABLE, simple}), + Nodes = lists:foldl( + fun(Node, Acc) -> + case ekka:is_member(Node) of + true -> Acc; + false -> _ = erlang:monitor_node(Node, true), + [Node | Acc] + end + end, [], mnesia:dirty_all_keys(?TABLE)), + {ok, TRef} = timer:send_interval(timer:seconds(1), stats), + {ok, #state{nodes = Nodes, stats_fun = StatsFun, stats_timer = TRef}}. + +handle_call(Req, _From, State) -> + emqx_log:error("[RouterHelper] Unexpected request: ~p", [Req]), + {reply, ignore, State}. + +handle_cast(Msg, State) -> + emqx_log:error("[RouterHelper] Unexpected msg: ~p", [Msg]), + {noreply, State}. + +handle_info({mnesia_table_event, {write, #routing_node{name = Node}, _}}, + State = #state{nodes = Nodes}) -> + emqx_log:info("[RouterHelper] New routing node: ~s", [Node]), + case ekka:is_member(Node) orelse lists:member(Node, Nodes) of + true -> {noreply, State}; + false -> _ = erlang:monitor_node(Node, true), + {noreply, State#state{nodes = [Node | Nodes]}} + end; + +handle_info({mnesia_table_event, _Event}, State) -> + {noreply, State}; + +handle_info({nodedown, Node}, State = #state{nodes = Nodes}) -> + global:trans({?LOCK, self()}, + fun() -> + mnesia:transaction(fun clean_routes/1, [Node]) + end), + mnesia:dirty_delete(routing_node, Node), + handle_info(stats, State#state{nodes = lists:delete(Node, Nodes)}); + +handle_info({membership, {mnesia, down, Node}}, State) -> + handle_info({nodedown, Node}, State); + +handle_info({membership, _Event}, State) -> + {noreply, State}; + +handle_info(stats, State = #state{stats_fun = StatsFun}) -> + ok = StatsFun(mnesia:table_info(route, size)), + {noreply, State, hibernate}; + +handle_info(Info, State) -> + emqx_log:error("[RouteHelper] Unexpected info: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, #state{stats_timer = TRef}) -> + timer:cancel(TRef), + ekka:unmonitor(membership), + mnesia:unsubscribe({table, ?TABLE, simple}). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +clean_routes(Node) -> + Patterns = [#route{_ = '_', dest = Node}, + #route{_ = '_', dest = {'_', Node}}], + [mnesia:delete_object(R) || P <- Patterns, + R <- mnesia:match_object(P)]. + diff --git a/src/emqx_router_sup.erl b/src/emqx_router_sup.erl index ddcbfe097..bb3c59e3e 100644 --- a/src/emqx_router_sup.erl +++ b/src/emqx_router_sup.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -27,12 +27,15 @@ start_link() -> init([]) -> StatsFun = emqx_stats:statsfun('routes/count', 'routes/max'), - SupFlags = #{strategy => one_for_all, intensity => 1, period => 5}, - Router = #{id => emqx_router, - start => {emqx_router, start_link, [StatsFun]}, - restart => permanent, - shutdown => 30000, - type => worker, - modules => [emqx_router]}, - {ok, {SupFlags, [Router]}}. + + %% Router helper + Helper = {router_helper, {emqx_router_helper, start_link, [StatsFun]}, + permanent, 5000, worker, [emqx_router_helper]}, + + %% Router pool + PoolSup = emqx_pool_sup:spec(router_pool, + [router, hash, emqx_sys:schedulers(), + {emqx_router, start_link, []}]), + + {ok, {{one_for_all, 0, 3600}, [Helper, PoolSup]}}. diff --git a/src/emqx_rpc.erl b/src/emqx_rpc.erl index f4ef07809..a95dd7323 100644 --- a/src/emqx_rpc.erl +++ b/src/emqx_rpc.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_server.erl b/src/emqx_server.erl deleted file mode 100644 index 1f7ee65ba..000000000 --- a/src/emqx_server.erl +++ /dev/null @@ -1,326 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_server). - --behaviour(gen_server). - --include("emqx.hrl"). - --include("emqx_mqtt.hrl"). - --include("emqx_internal.hrl"). - --export([start_link/3]). - -%% PubSub API. --export([subscribe/1, subscribe/2, subscribe/3, publish/1, - unsubscribe/1, unsubscribe/2]). - -%% Async PubSub API. --export([async_subscribe/1, async_subscribe/2, async_subscribe/3, - async_unsubscribe/1, async_unsubscribe/2]). - -%% Management API. --export([setqos/3, subscriptions/1, subscribers/1, subscribed/2]). - -%% Debug API --export([dump/0]). - -%% gen_server Callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --record(state, {pool, id, env, subids :: map(), submon :: emqx_pmon:pmon()}). - -%% @doc Start the server --spec(start_link(atom(), pos_integer(), list()) -> {ok, pid()} | ignore | {error, term()}). -start_link(Pool, Id, Env) -> - gen_server:start_link(?MODULE, [Pool, Id, Env], [{hibernate_after, 10000}]). - -%%-------------------------------------------------------------------- -%% PubSub API -%%-------------------------------------------------------------------- - -%% @doc Subscribe to a Topic. --spec(subscribe(binary()) -> ok | {error, term()}). -subscribe(Topic) when is_binary(Topic) -> - subscribe(Topic, self()). - --spec(subscribe(binary(), emqx:subscriber()) -> ok | {error, term()}). -subscribe(Topic, Subscriber) when is_binary(Topic) -> - subscribe(Topic, Subscriber, []). - --spec(subscribe(binary(), emqx:subscriber(), [emqx:suboption()]) -> - ok | {error, term()}). -subscribe(Topic, Subscriber, Options) when is_binary(Topic) -> - call(pick(Subscriber), {subscribe, Topic, with_subpid(Subscriber), Options}). - -%% @doc Subscribe to a Topic asynchronously. --spec(async_subscribe(binary()) -> ok). -async_subscribe(Topic) when is_binary(Topic) -> - async_subscribe(Topic, self()). - --spec(async_subscribe(binary(), emqx:subscriber()) -> ok). -async_subscribe(Topic, Subscriber) when is_binary(Topic) -> - async_subscribe(Topic, Subscriber, []). - --spec(async_subscribe(binary(), emqx:subscriber(), [emqx:suboption()]) -> ok). -async_subscribe(Topic, Subscriber, Options) when is_binary(Topic) -> - cast(pick(Subscriber), {subscribe, Topic, with_subpid(Subscriber), Options}). - -%% @doc Publish a message --spec(publish(mqtt_message()) -> {ok, mqtt_delivery()} | ignore). -publish(Msg = #mqtt_message{from = From}) -> - trace(publish, From, Msg), - case emqx_hooks:run('message.publish', [], Msg) of - {ok, Msg1 = #mqtt_message{topic = Topic}} -> - emqx_pubsub:publish(Topic, Msg1); - {stop, Msg1} -> - lager:warning("Stop publishing: ~s", [emqx_message:format(Msg1)]), - ignore - end. - -%% @private -trace(publish, From, _Msg) when is_atom(From) -> - %% Dont' trace '$SYS' publish - ignore; -trace(publish, {ClientId, Username}, #mqtt_message{topic = Topic, payload = Payload}) -> - lager:info([{client, ClientId}, {topic, Topic}], - "~s/~s PUBLISH to ~s: ~p", [ClientId, Username, Topic, Payload]); -trace(publish, From, #mqtt_message{topic = Topic, payload = Payload}) when is_binary(From); is_list(From) -> - lager:info([{client, From}, {topic, Topic}], - "~s PUBLISH to ~s: ~p", [From, Topic, Payload]). - -%% @doc Unsubscribe --spec(unsubscribe(binary()) -> ok | {error, term()}). -unsubscribe(Topic) when is_binary(Topic) -> - unsubscribe(Topic, self()). - -%% @doc Unsubscribe --spec(unsubscribe(binary(), emqx:subscriber()) -> ok | {error, term()}). -unsubscribe(Topic, Subscriber) when is_binary(Topic) -> - call(pick(Subscriber), {unsubscribe, Topic, with_subpid(Subscriber)}). - -%% @doc Async Unsubscribe --spec(async_unsubscribe(binary()) -> ok). -async_unsubscribe(Topic) when is_binary(Topic) -> - async_unsubscribe(Topic, self()). - --spec(async_unsubscribe(binary(), emqx:subscriber()) -> ok). -async_unsubscribe(Topic, Subscriber) when is_binary(Topic) -> - cast(pick(Subscriber), {unsubscribe, Topic, with_subpid(Subscriber)}). - --spec(setqos(binary(), emqx:subscriber(), mqtt_qos()) -> ok). -setqos(Topic, Subscriber, Qos) when is_binary(Topic) -> - call(pick(Subscriber), {setqos, Topic, with_subpid(Subscriber), Qos}). - -with_subpid(SubPid) when is_pid(SubPid) -> - SubPid; -with_subpid(SubId) when is_binary(SubId) -> - {SubId, self()}; -with_subpid({SubId, SubPid}) when is_binary(SubId), is_pid(SubPid) -> - {SubId, SubPid}. - --spec(subscriptions(emqx:subscriber()) -> [{emqx:subscriber(), binary(), list(emqx:suboption())}]). -subscriptions(SubPid) when is_pid(SubPid) -> - with_subproperty(ets:lookup(mqtt_subscription, SubPid)); - -subscriptions(SubId) when is_binary(SubId) -> - with_subproperty(ets:match_object(mqtt_subscription, {{SubId, '_'}, '_'})); - -subscriptions({SubId, SubPid}) when is_binary(SubId), is_pid(SubPid) -> - with_subproperty(ets:lookup(mqtt_subscription, {SubId, SubPid})). - -with_subproperty({Subscriber, {share, _Share, Topic}}) -> - with_subproperty({Subscriber, Topic}); -with_subproperty({Subscriber, Topic}) -> - {Subscriber, Topic, ets:lookup_element(mqtt_subproperty, {Topic, Subscriber}, 2)}; -with_subproperty(Subscriptions) when is_list(Subscriptions) -> - [with_subproperty(Subscription) || Subscription <- Subscriptions]. - --spec(subscribers(binary()) -> list(emqx:subscriber())). -subscribers(Topic) when is_binary(Topic) -> - emqx_pubsub:subscribers(Topic). - --spec(subscribed(binary(), emqx:subscriber()) -> boolean()). -subscribed(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) -> - ets:member(mqtt_subproperty, {Topic, SubPid}); -subscribed(Topic, SubId) when is_binary(Topic), is_binary(SubId) -> - length(ets:match_object(mqtt_subproperty, {{Topic, {SubId, '_'}}, '_'}, 1)) == 1; -subscribed(Topic, {SubId, SubPid}) when is_binary(Topic), is_binary(SubId), is_pid(SubPid) -> - ets:member(mqtt_subproperty, {Topic, {SubId, SubPid}}). - -call(Server, Req) -> - gen_server:call(Server, Req, infinity). - -cast(Server, Msg) when is_pid(Server) -> - gen_server:cast(Server, Msg). - -pick(SubPid) when is_pid(SubPid) -> - gproc_pool:pick_worker(server, SubPid); -pick(SubId) when is_binary(SubId) -> - gproc_pool:pick_worker(server, SubId); -pick({SubId, SubPid}) when is_binary(SubId), is_pid(SubPid) -> - pick(SubId). - -dump() -> - [{Tab, ets:tab2list(Tab)} || Tab <- [mqtt_subproperty, mqtt_subscription, mqtt_subscriber]]. - -%%-------------------------------------------------------------------- -%% gen_server Callbacks -%%-------------------------------------------------------------------- - -init([Pool, Id, Env]) -> - gproc_pool:connect_worker(Pool, {Pool, Id}), - State = #state{pool = Pool, id = Id, env = Env, - subids = #{}, submon = emqx_pmon:new()}, - {ok, State, hibernate}. - -handle_call({subscribe, Topic, Subscriber, Options}, _From, State) -> - case do_subscribe(Topic, Subscriber, Options, State) of - {ok, NewState} -> reply(ok, setstats(NewState)); - {error, Error} -> reply({error, Error}, State) - end; - -handle_call({unsubscribe, Topic, Subscriber}, _From, State) -> - case do_unsubscribe(Topic, Subscriber, State) of - {ok, NewState} -> reply(ok, setstats(NewState)); - {error, Error} -> reply({error, Error}, State) - end; - -handle_call({setqos, Topic, Subscriber, Qos}, _From, State) -> - Key = {Topic, Subscriber}, - case ets:lookup(mqtt_subproperty, Key) of - [{_, Opts}] -> - Opts1 = lists:ukeymerge(1, [{qos, Qos}], Opts), - ets:insert(mqtt_subproperty, {Key, Opts1}), - reply(ok, State); - [] -> - reply({error, {subscription_not_found, Topic}}, State) - end; - -handle_call(Req, _From, State) -> - ?UNEXPECTED_REQ(Req, State). - -handle_cast({subscribe, Topic, Subscriber, Options}, State) -> - case do_subscribe(Topic, Subscriber, Options, State) of - {ok, NewState} -> noreply(setstats(NewState)); - {error, _Error} -> noreply(State) - end; - -handle_cast({unsubscribe, Topic, Subscriber}, State) -> - case do_unsubscribe(Topic, Subscriber, State) of - {ok, NewState} -> noreply(setstats(NewState)); - {error, _Error} -> noreply(State) - end; - -handle_cast(Msg, State) -> - ?UNEXPECTED_MSG(Msg, State). - -handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{subids = SubIds}) -> - case maps:find(DownPid, SubIds) of - {ok, SubId} -> - clean_subscriber({SubId, DownPid}); - error -> - clean_subscriber(DownPid) - end, - noreply(setstats(demonitor_subscriber(DownPid, State))); - -handle_info(Info, State) -> - ?UNEXPECTED_INFO(Info, State). - -terminate(_Reason, #state{pool = Pool, id = Id}) -> - gproc_pool:disconnect_worker(Pool, {Pool, Id}). - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- -%% Internal Functions -%%-------------------------------------------------------------------- - -do_subscribe(Topic, Subscriber, Options, State) -> - case ets:lookup(mqtt_subproperty, {Topic, Subscriber}) of - [] -> - emqx_pubsub:async_subscribe(Topic, Subscriber, Options), - Share = proplists:get_value(share, Options), - add_subscription(Share, Subscriber, Topic), - ets:insert(mqtt_subproperty, {{Topic, Subscriber}, Options}), - {ok, monitor_subscriber(Subscriber, State)}; - [_] -> - {error, {already_subscribed, Topic}} - end. - -add_subscription(undefined, Subscriber, Topic) -> - ets:insert(mqtt_subscription, {Subscriber, Topic}); -add_subscription(Share, Subscriber, Topic) -> - ets:insert(mqtt_subscription, {Subscriber, {share, Share, Topic}}). - -monitor_subscriber(SubPid, State = #state{submon = SubMon}) when is_pid(SubPid) -> - State#state{submon = SubMon:monitor(SubPid)}; -monitor_subscriber({SubId, SubPid}, State = #state{subids = SubIds, submon = SubMon}) -> - State#state{subids = maps:put(SubPid, SubId, SubIds), submon = SubMon:monitor(SubPid)}. - -do_unsubscribe(Topic, Subscriber, State) -> - case ets:lookup(mqtt_subproperty, {Topic, Subscriber}) of - [{_, Options}] -> - emqx_pubsub:async_unsubscribe(Topic, Subscriber, Options), - Share = proplists:get_value(share, Options), - del_subscription(Share, Subscriber, Topic), - ets:delete(mqtt_subproperty, {Topic, Subscriber}), - {ok, State}; - [] -> - {error, {subscription_not_found, Topic}} - end. - -del_subscription(undefined, Subscriber, Topic) -> - ets:delete_object(mqtt_subscription, {Subscriber, Topic}); -del_subscription(Share, Subscriber, Topic) -> - ets:delete_object(mqtt_subscription, {Subscriber, {share, Share, Topic}}). - -clean_subscriber(Subscriber) -> - lists:foreach(fun({_, {share, Share, Topic}}) -> - clean_subscriber(Share, Subscriber, Topic); - ({_, Topic}) -> - clean_subscriber(undefined, Subscriber, Topic) - end, ets:lookup(mqtt_subscription, Subscriber)), - ets:delete(mqtt_subscription, Subscriber). - -clean_subscriber(Share, Subscriber, Topic) -> - case ets:lookup(mqtt_subproperty, {Topic, Subscriber}) of - [] -> - %% TODO:....??? - Options = if Share == undefined -> []; true -> [{share, Share}] end, - emqx_pubsub:async_unsubscribe(Topic, Subscriber, Options); - [{_, Options}] -> - emqx_pubsub:async_unsubscribe(Topic, Subscriber, Options), - ets:delete(mqtt_subproperty, {Topic, Subscriber}) - end. - -demonitor_subscriber(SubPid, State = #state{subids = SubIds, submon = SubMon}) -> - State#state{subids = maps:remove(SubPid, SubIds), submon = SubMon:demonitor(SubPid)}. - -setstats(State) -> - emqx_stats:setstats('subscriptions/count', 'subscriptions/max', - ets:info(mqtt_subscription, size)), State. - -reply(Reply, State) -> - {reply, Reply, State}. - -noreply(State) -> - {noreply, State}. - diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 4f89e73ae..c8e069cd7 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -194,11 +194,11 @@ subscribe(Session, PacketId, TopicTable) -> %%TODO: the ack function??... -spec(publish(pid(), message()) -> ok | {error, term()}). publish(_Session, Msg = #mqtt_message{qos = ?QOS_0}) -> %% Publish QoS0 Directly - emqx_server:publish(Msg), ok; + emqx_broker:publish(Msg), ok; publish(_Session, Msg = #mqtt_message{qos = ?QOS_1}) -> %% Publish QoS1 message directly for client will PubAck automatically - emqx_server:publish(Msg), ok; + emqx_broker:publish(Msg), ok; publish(Session, Msg = #mqtt_message{qos = ?QOS_2}) -> %% Publish QoS2 to Session @@ -365,7 +365,7 @@ handle_cast({subscribe, From, TopicTable, AckFun}, ?LOG(warning, "Duplicated subscribe: ~s, qos = ~w", [Topic, NewQos], State), SubMap; {ok, OldQos} -> - emqx_server:setqos(Topic, ClientId, NewQos), + emqx_broker:setopts(Topic, ClientId, [{qos, NewQos}]), emqx_hooks:run('session.subscribed', [ClientId, Username], {Topic, Opts}), ?LOG(warning, "Duplicated subscribe ~s, old_qos=~w, new_qos=~w", [Topic, OldQos, NewQos], State), @@ -438,7 +438,7 @@ handle_cast({pubrel, PacketId}, State = #state{awaiting_rel = AwaitingRel}) -> {Msg, AwaitingRel1} -> %% Implement Qos2 by method A [MQTT 4.33] %% Dispatch to subscriber when received PUBREL - spawn(emqx_server, publish, [Msg]), %%:) + emqx_broker:publish(Msg), %% FIXME: gc(State#state{awaiting_rel = AwaitingRel1}); error -> ?LOG(warning, "Cannot find PUBREL: ~p", [PacketId], State), diff --git a/src/emqx_session_sup.erl b/src/emqx_session_sup.erl index 8a5fa8b2e..c7e18e1cd 100644 --- a/src/emqx_session_sup.erl +++ b/src/emqx_session_sup.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -18,7 +18,9 @@ -behavior(supervisor). --export([start_link/0, start_session/3]). +-include("emqx.hrl"). + +-export([start_link/0, start_session_process/1]). -export([init/1]). @@ -27,10 +29,10 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). -%% @doc Start a session --spec(start_session(boolean(), {binary(), binary() | undefined} , pid()) -> {ok, pid()}). -start_session(CleanSess, {ClientId, Username}, ClientPid) -> - supervisor:start_child(?MODULE, [CleanSess, {ClientId, Username}, ClientPid]). +%% @doc Start a session process +-spec(start_session_process(session()) -> {ok, pid()}). +start_session_process(Session) -> + supervisor:start_child(?MODULE, [Session]). %%-------------------------------------------------------------------- %% Supervisor callbacks diff --git a/src/emqx_shared_pubsub.erl b/src/emqx_shared_pubsub.erl new file mode 100644 index 000000000..bbc4d1930 --- /dev/null +++ b/src/emqx_shared_pubsub.erl @@ -0,0 +1,171 @@ +%%-------------------------------------------------------------------- +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_shared_pubsub). + +-behaviour(gen_server). + +-include("emqx.hrl"). + +%% Mnesia bootstrap +-export([mnesia/1]). + +-boot_mnesia({mnesia, [boot]}). +-copy_mnesia({mnesia, [copy]}). + +%% API +-export([start_link/0]). + +-export([strategy/0]). + +-export([subscribe/3, unsubscribe/3]). + +-export([dispatch/3]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-define(TABLE, shared_subscription). + +-record(state, {pmon}). + +-record(shared_subscription, {group, topic, subpid}). + +%%-------------------------------------------------------------------- +%% Mnesia bootstrap +%%-------------------------------------------------------------------- + +mnesia(boot) -> + ok = ekka_mnesia:create_table(?TABLE, [ + {type, bag}, + {ram_copies, [node()]}, + {record_name, shared_subscription}, + {attributes, record_info(fields, shared_subscription)}]); + +mnesia(copy) -> + ok = ekka_mnesia:copy_table(?TABLE). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +-spec(start_link() -> {ok, pid()} | ignore | {error, any()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +-spec(strategy() -> random | hash). +strategy() -> + application:get_env(emqx, load_balancing_strategy, random). + +subscribe(undefined, _Topic, _SubPid) -> + ok; +subscribe(Group, Topic, SubPid) when is_pid(SubPid) -> + mnesia:dirty_write(r(Group, Topic, SubPid)), + gen_server:cast(?SERVER, {monitor, SubPid}). + +unsubscribe(undefined, _Topic, _SubPid) -> + ok; +unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) -> + mnesia:dirty_delete_object(r(Group, Topic, SubPid)). + +r(Group, Topic, SubPid) -> + #shared_subscription{group = Group, topic = Topic, subpid = SubPid}. + +dispatch(Group, Topic, Delivery = #delivery{message = Msg, flows = Flows}) -> + case pick(subscribers(Group, Topic)) of + false -> Delivery; + SubPid -> SubPid ! {dispatch, Topic, Msg}, + Delivery#delivery{flows = [{dispatch, {Group, Topic}, 1} | Flows]} + end. + +pick([]) -> + false; +pick([SubPid]) -> + SubPid; +pick(SubPids) -> + X = abs(erlang:monotonic_time() + bxor erlang:unique_integer()), + lists:nth((X rem length(SubPids)) + 1, SubPids). + +subscribers(Group, Topic) -> + MP = {shared_subscription, Group, Topic, '$1'}, + ets:select(shared_subscription, [{MP, [], ['$1']}]). + +%%-------------------------------------------------------------------- +%% gen_server callbacks +%%-------------------------------------------------------------------- + +init([]) -> + {atomic, PMon} = mnesia:transaction(fun init_monitors/0), + mnesia:subscribe({table, ?TABLE, simple}), + {ok, #state{pmon = PMon}}. + +init_monitors() -> + mnesia:foldl( + fun(#shared_subscription{subpid = SubPid}, Mon) -> + Mon:monitor(SubPid) + end, emqx_pmon:new(), ?TABLE). + +handle_call(Req, _From, State) -> + emqx_log:error("[Shared] Unexpected request: ~p", [Req]), + {reply, ignore, State}. + +handle_cast({monitor, SubPid}, State= #state{pmon = PMon}) -> + {noreply, State#state{pmon = PMon:monitor(SubPid)}}; + +handle_cast(Msg, State) -> + emqx_log:error("[Shared] Unexpected msg: ~p", [Msg]), + {noreply, State}. + +handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = PMon}) -> + emqx_log:info("Shared subscription created: ~p", [NewRecord]), + #shared_subscription{subpid = SubPid} = NewRecord, + {noreply, State#state{pmon = PMon:monitor(SubPid)}}; + +handle_info({mnesia_table_event, {delete_object, OldRecord, _}}, State = #state{pmon = PMon}) -> + emqx_log:info("Shared subscription deleted: ~p", [OldRecord]), + #shared_subscription{subpid = SubPid} = OldRecord, + {noreply, State#state{pmon = PMon:demonitor(SubPid)}}; + +handle_info({mnesia_table_event, _Event}, State) -> + {noreply, State}; + +handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #state{pmon = PMon}) -> + emqx_log:info("Shared subscription down: ~p", [SubPid]), + mnesia:transaction(fun clean_down/1, [SubPid]), + {noreply, State#state{pmon = PMon:erase(SubPid)}}; + +handle_info(Info, State) -> + emqx_log:error("[Shared] Unexpected info: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, _State) -> + mnesia:unsubscribe({table, ?TABLE, simple}). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +clean_down(SubPid) -> + MP = #shared_subscription{_ = '_', subpid = SubPid}, + lists:foreach(fun mnesia:delete_object/1, mnesia:match_object(MP)). + diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index 30983d0b5..b90dd693b 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -29,7 +29,7 @@ %% API Function Exports -export([start_link/2]). --export([start_session/2, lookup_session/1, register_session/3, +-export([open_session/1, start_session/2, lookup_session/1, register_session/3, unregister_session/1, unregister_session/2]). -export([dispatch/3]). @@ -47,7 +47,7 @@ -define(TIMEOUT, 120000). -define(LOG(Level, Format, Args, Session), - lager:Level("SM(~s): " ++ Format, [Session#mqtt_session.client_id | Args])). + lager:Level("SM(~s): " ++ Format, [Session#session.client_id | Args])). %%-------------------------------------------------------------------- %% Mnesia callbacks @@ -55,7 +55,7 @@ mnesia(boot) -> %% Global Session Table - ok = ekka_mnesia:create_table(mqtt_session, [ + ok = ekka_mnesia:create_table(session, [ {type, set}, {ram_copies, [node()]}, {record_name, mqtt_session}, @@ -68,6 +68,49 @@ mnesia(copy) -> %% API %%-------------------------------------------------------------------- +%% Open a clean start session. +open_session(Session = #{client_id := ClientId, clean_start := true, expiry_interval := Interval}) -> + with_lock(ClientId, + fun() -> + {ResL, BadNodes} = emqx_rpc:multicall(ekka:nodelist(), ?MODULE, discard_session, [ClientId]), + io:format("ResL: ~p, BadNodes: ~p~n", [ResL, BadNodes]), + case Interval > 0 of + true -> + {ok, emqx_session_sup:start_session_process(Session)}; + false -> + {ok, emqx_session:init_state(Session)} + end + end). + +open_session(Session = #{client_id := ClientId, clean_start := false, expiry_interval := Interval}) -> + with_lock(ClientId, + fun() -> + {ResL, BadNodes} = emqx_rpc:multicall(ekka:nodelist(), ?MODULE, lookup_session, [ClientId]), + [SessionPid | _] = lists:flatten(ResL), + + + + end). + +lookup_session(ClientId) -> + ets:lookup(session, ClientId). + + +lookup_session(ClientId) -> + ets:lookup(session, ClientId). + +with_lock(undefined, Fun) -> + Fun(); + + +with_lock(ClientId, Fun) -> + case emqx_sm_locker:lock(ClientId) of + true -> Result = Fun(), + ok = emqx_sm_locker:unlock(ClientId), + Result; + false -> {error, client_id_unavailable} + end. + %% @doc Start a session manager -spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, term()}). start_link(Pool, Id) -> @@ -92,7 +135,7 @@ lookup_session(ClientId) -> register_session(ClientId, CleanSess, Properties) -> ets:insert(mqtt_local_session, {ClientId, self(), CleanSess, Properties}). -%% @doc Unregister a session. +%% @doc Unregister a Session. -spec(unregister_session(binary()) -> boolean()). unregister_session(ClientId) -> unregister_session(ClientId, self()). diff --git a/src/emqx_sm_helper.erl b/src/emqx_sm_helper.erl index 6751fd54d..068156b94 100644 --- a/src/emqx_sm_helper.erl +++ b/src/emqx_sm_helper.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_sm_locker.erl b/src/emqx_sm_locker.erl new file mode 100644 index 000000000..53af8a678 --- /dev/null +++ b/src/emqx_sm_locker.erl @@ -0,0 +1,43 @@ +%%-------------------------------------------------------------------- +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_sm_locker). + +-include("emqx.hrl"). + +-export([start_link/0]). + +%% Lock/Unlock API based on canal-lock. +-export([lock/1, unlock/1]). + +%% @doc Starts the lock server +-spec(start_link() -> {ok, pid()} | ignore | {error, any()}). +start_link() -> + canal_lock:start_link(?MODULE, 1). + +%% @doc Lock a clientid +-spec(lock(client_id()) -> boolean()). +lock(ClientId) -> + case canal_lock:acquire(?MODULE, ClientId, 1, 1) of + {acquired, 1} -> true; + full -> false + end. + +%% @doc Unlock a clientid +-spec(unlock(client_id()) -> ok). +unlock(ClientId) -> + canal_lock:release(?MODULE, ClientId, 1, 1). + diff --git a/src/emqx_sm_sup.erl b/src/emqx_sm_sup.erl index feef64e2b..1b032a5ae 100644 --- a/src/emqx_sm_sup.erl +++ b/src/emqx_sm_sup.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_stats.erl b/src/emqx_stats.erl index 1fbc7b245..219914997 100644 --- a/src/emqx_stats.erl +++ b/src/emqx_stats.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -30,7 +30,7 @@ set_session_stats/2, get_session_stats/1, del_session_stats/1]). %% Statistics API. --export([statsfun/1, statsfun/2, getstats/0, getstat/1, setstat/2, setstats/3]). +-export([statsfun/1, statsfun/2, getstats/0, getstat/1, setstat/2, setstat/3]). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -124,10 +124,10 @@ statsfun(Stat) -> fun(Val) -> setstat(Stat, Val) end. -spec(statsfun(Stat :: atom(), MaxStat :: atom()) -> fun()). -statsfun(Stat, MaxStat) -> - fun(Val) -> setstats(Stat, MaxStat, Val) end. +statsfun(Stat, MaxStat) -> + fun(Val) -> setstat(Stat, MaxStat, Val) end. -%% @doc Get broker statistics +%% @doc Get all statistics -spec(getstats() -> [{atom(), non_neg_integer()}]). getstats() -> lists:sort(ets:tab2list(?STATS_TAB)). @@ -146,9 +146,9 @@ setstat(Stat, Val) -> ets:update_element(?STATS_TAB, Stat, {2, Val}). %% @doc Set stats with max --spec(setstats(Stat :: atom(), MaxStat :: atom(), Val :: pos_integer()) -> boolean()). -setstats(Stat, MaxStat, Val) -> - gen_server:cast(?MODULE, {setstats, Stat, MaxStat, Val}). +-spec(setstat(Stat :: atom(), MaxStat :: atom(), Val :: pos_integer()) -> boolean()). +setstat(Stat, MaxStat, Val) -> + gen_server:cast(?MODULE, {setstat, Stat, MaxStat, Val}). %%-------------------------------------------------------------------- %% gen_server callbacks @@ -172,7 +172,7 @@ handle_call(_Request, _From, State) -> {reply, error, State}. %% atomic -handle_cast({setstats, Stat, MaxStat, Val}, State) -> +handle_cast({setstat, Stat, MaxStat, Val}, State) -> MaxVal = ets:lookup_element(?STATS_TAB, MaxStat, 2), if Val > MaxVal -> diff --git a/src/emqx_sup.erl b/src/emqx_sup.erl index e115674f9..b1e229ed6 100644 --- a/src/emqx_sup.erl +++ b/src/emqx_sup.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -34,7 +34,6 @@ start_link() -> supervisor:start_link({local, ?SUPERVISOR}, ?MODULE, []). - -spec(start_child(supervisor:child_spec()) -> startchild_ret()). start_child(ChildSpec) when is_tuple(ChildSpec) -> supervisor:start_child(?SUPERVISOR, ChildSpec). @@ -58,17 +57,16 @@ init([]) -> {ok, {{one_for_all, 0, 1}, [?CHILD(emqx_ctl, worker), ?CHILD(emqx_hooks, worker), - ?CHILD(emqx_router, worker), - ?CHILD(emqx_pubsub_sup, supervisor), ?CHILD(emqx_stats, worker), ?CHILD(emqx_metrics, worker), + ?CHILD(emqx_router_sup, supervisor), + ?CHILD(emqx_broker_sup, supervisor), ?CHILD(emqx_pooler, supervisor), ?CHILD(emqx_trace_sup, supervisor), ?CHILD(emqx_cm_sup, supervisor), ?CHILD(emqx_sm_sup, supervisor), ?CHILD(emqx_session_sup, supervisor), - ?CHILD(emqx_ws_client_sup, supervisor), - ?CHILD(emqx_broker, worker), + ?CHILD(emqx_ws_connection_sup, supervisor), ?CHILD(emqx_alarm, worker), ?CHILD(emqx_mod_sup, supervisor), ?CHILD(emqx_bridge_sup_sup, supervisor), diff --git a/src/emqx_sys.erl b/src/emqx_sys.erl new file mode 100644 index 000000000..3c2ba1e3e --- /dev/null +++ b/src/emqx_sys.erl @@ -0,0 +1,177 @@ +%%-------------------------------------------------------------------- +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_sys). + +-behaviour(gen_server). + +-include("emqx.hrl"). + +-export([start_link/0]). + +-export([schedulers/0]). + +-export([version/0, uptime/0, datetime/0, sysdescr/0, sys_interval/0]). + +-export([info/0]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, {started_at, heartbeat, sys_ticker, version, sysdescr}). + +-define(APP, emqx). + +-define(SERVER, ?MODULE). + +%% $SYS Topics of Broker +-define(SYSTOP_BROKERS, [ + version, % Broker version + uptime, % Broker uptime + datetime, % Broker local datetime + sysdescr % Broker description +]). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +-spec(start_link() -> {ok, pid()} | ignore | {error, any()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%% @doc Get schedulers +-spec(schedulers() -> pos_integer()). +schedulers() -> + erlang:system_info(schedulers). + +%% @doc Get sys version +-spec(version() -> string()). +version() -> + {ok, Version} = application:get_key(?APP, vsn), Version. + +%% @doc Get sys description +-spec(sysdescr() -> string()). +sysdescr() -> + {ok, Descr} = application:get_key(?APP, description), Descr. + +%% @doc Get sys uptime +-spec(uptime() -> string()). +uptime() -> gen_server:call(?SERVER, uptime). + +%% @doc Get sys datetime +-spec(datetime() -> string()). +datetime() -> + {{Y, M, D}, {H, MM, S}} = calendar:local_time(), + lists:flatten( + io_lib:format( + "~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])). + +sys_interval() -> + application:get_env(?APP, sys_interval, 60000). + +%% @doc Get sys info +-spec(info() -> list(tuple())). +info() -> + [{version, version()}, + {sysdescr, sysdescr()}, + {uptime, uptime()}, + {datetime, datetime()}]. + +%%-------------------------------------------------------------------- +%% gen_server callbacks +%%-------------------------------------------------------------------- + +init([]) -> + Tick = fun(I, M) -> + {ok, TRef} = timer:send_interval(I, M), TRef + end, + {ok, #state{started_at = os:timestamp(), + heartbeat = Tick(1000, heartbeat), + sys_ticker = Tick(sys_interval(), tick), + version = iolist_to_binary(version()), + sysdescr = iolist_to_binary(sysdescr())}, hibernate}. + +handle_call(uptime, _From, State) -> + {reply, uptime(State), State}; + +handle_call(Req, _From, State) -> + emqx_log:error("[SYS] Unexpected request: ~p", [Req]), + {reply, ignore, State}. + +handle_cast(Msg, State) -> + emqx_log:error("[SYS] Unexpected msg: ~p", [Msg]), + {noreply, State}. + +handle_info(heartbeat, State) -> + publish(uptime, iolist_to_binary(uptime(State))), + publish(datetime, iolist_to_binary(datetime())), + {noreply, State, hibernate}; + +handle_info(tick, State = #state{version = Version, sysdescr = Descr}) -> + retain(brokers), + retain(version, Version), + retain(sysdescr, Descr), + {noreply, State, hibernate}; + +handle_info(Info, State) -> + emqx_log:error("[SYS] Unexpected info: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, #state{heartbeat = Hb, sys_ticker = TRef}) -> + timer:cancel(Hb), + timer:cancel(TRef). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +retain(brokers) -> + Payload = list_to_binary(string:join([atom_to_list(N) || + N <- ekka_mnesia:running_nodes()], ",")), + Msg = emqx_message:make(broker, <<"$SYS/brokers">>, Payload), + emqx:publish(emqx_message:set_flag(sys, emqx_message:set_flag(retain, Msg))). + +retain(Topic, Payload) when is_binary(Payload) -> + Msg = emqx_message:make(broker, emqx_topic:systop(Topic), Payload), + emqx:publish(emqx_message:set_flag(sys, emqx_message:set_flag(retain, Msg))). + +publish(Topic, Payload) when is_binary(Payload) -> + Msg = emqx_message:make(broker, emqx_topic:systop(Topic), Payload), + emqx:publish(emqx_message:set_flag(sys, Msg)). + +uptime(#state{started_at = Ts}) -> + Secs = timer:now_diff(os:timestamp(), Ts) div 1000000, + lists:flatten(uptime(seconds, Secs)). + +uptime(seconds, Secs) when Secs < 60 -> + [integer_to_list(Secs), " seconds"]; +uptime(seconds, Secs) -> + [uptime(minutes, Secs div 60), integer_to_list(Secs rem 60), " seconds"]; +uptime(minutes, M) when M < 60 -> + [integer_to_list(M), " minutes, "]; +uptime(minutes, M) -> + [uptime(hours, M div 60), integer_to_list(M rem 60), " minutes, "]; +uptime(hours, H) when H < 24 -> + [integer_to_list(H), " hours, "]; +uptime(hours, H) -> + [uptime(days, H div 24), integer_to_list(H rem 24), " hours, "]; +uptime(days, D) -> + [integer_to_list(D), " days,"]. + diff --git a/src/emqx_sysmon.erl b/src/emqx_sysmon.erl index 7f4a2490d..7fc270a6a 100644 --- a/src/emqx_sysmon.erl +++ b/src/emqx_sysmon.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_sysmon_sup.erl b/src/emqx_sysmon_sup.erl index 6bcbf76c8..d4aa3aa98 100644 --- a/src/emqx_sysmon_sup.erl +++ b/src/emqx_sysmon_sup.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_time.erl b/src/emqx_time.erl index d954c8204..4a92e12ae 100644 --- a/src/emqx_time.erl +++ b/src/emqx_time.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_topic.erl b/src/emqx_topic.erl index 267786943..bc7c52f9e 100644 --- a/src/emqx_topic.erl +++ b/src/emqx_topic.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ -module(emqx_topic). +-include("emqx.hrl"). + -include("emqx_mqtt.hrl"). -import(lists, [reverse/1]). @@ -26,9 +28,7 @@ -export([parse/1, parse/2]). --type(topic() :: binary()). - --type(option() :: local | {qos, mqtt_qos()} | {share, '$queue' | binary()}). +-type(option() :: {qos, mqtt_qos()} | {share, '$queue' | binary()}). -type(word() :: '' | '+' | '#' | binary()). @@ -36,7 +36,7 @@ -type(triple() :: {root | binary(), word(), binary()}). --export_type([topic/0, option/0, word/0, triple/0]). +-export_type([option/0, word/0, triple/0]). -define(MAX_TOPIC_LEN, 4096). @@ -101,7 +101,7 @@ validate2([''|Words]) -> validate2(['+'|Words]) -> validate2(Words); validate2([W|Words]) -> - case validate3(W) of true -> validate2(Words); false -> false end. + validate3(W) andalso validate2(Words). validate3(<<>>) -> true; @@ -177,39 +177,24 @@ join(Words) -> parse(Topic) when is_binary(Topic) -> parse(Topic, []). -parse(<<"$local/", Topic1/binary>>, Options) -> - if_not_contain(local, Options, fun() -> - parse(Topic1, [local | Options]) - end); - -parse(<<"$fastlane/", Topic1/binary>>, Options) -> - if_not_contain(fastlane, Options, fun() -> - parse(Topic1, [fastlane | Options]) - end); - -parse(<<"$queue/", Topic1/binary>>, Options) -> - if_not_contain(share, Options,fun() -> - parse(Topic1, [{share, '$queue'} | Options]) - end); - -parse(<<"$share/", Topic1/binary>>, Options) -> - if_not_contain(share, Options, fun() -> - [Share, Topic2] = binary:split(Topic1, <<"/">>), - {Topic2, [{share, Share} | Options]} - end); - -parse(Topic, Options) -> - {Topic, Options}. - -if_not_contain(Key, Options, Fun) when Key == local; Key == fastlane -> - case lists:member(Key, Options) of - true -> error(invalid_topic); - false -> Fun() +parse(Topic = <<"$fastlane/", Topic1/binary>>, Options) -> + case lists:member(fastlane, Options) of + true -> error({invalid_topic, Topic}); + false -> parse(Topic1, [fastlane | Options]) end; -if_not_contain(share, Options, Fun) -> +parse(Topic = <<"$queue/", Topic1/binary>>, Options) -> case lists:keyfind(share, 1, Options) of - true -> error(invalid_topic); - false -> Fun() - end. + {share, _} -> error({invalid_topic, Topic}); + false -> parse(Topic1, [{share, '$queue'} | Options]) + end; + +parse(Topic = <<"$share/", Topic1/binary>>, Options) -> + case lists:keyfind(share, 1, Options) of + {share, _} -> error({invalid_topic, Topic}); + false -> [Group, Topic2] = binary:split(Topic1, <<"/">>), + {Topic2, [{share, Group} | Options]} + end; + +parse(Topic, Options) -> {Topic, Options}. diff --git a/src/emqx_trace.erl b/src/emqx_trace.erl index 99da331f7..fc68b3cb5 100644 --- a/src/emqx_trace.erl +++ b/src/emqx_trace.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -14,13 +14,16 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_trace). +-module(emqx_tracer). -behaviour(gen_server). -%% API Function Exports +-include("emqx.hrl"). + -export([start_link/0]). +-export([trace/3]). + -export([start_trace/2, stop_trace/1, all_traces/0]). %% gen_server Function Exports @@ -31,16 +34,36 @@ -type(trace_who() :: {client | topic, binary()}). --define(TRACE_OPTIONS, [{formatter_config, [time, " [",severity,"] ", message, "\n"]}]). +-define(OPTIONS, [{formatter_config, [time, " [",severity,"] ", message, "\n"]}]). %%-------------------------------------------------------------------- -%% API +%% Start the tracer %%-------------------------------------------------------------------- -spec(start_link() -> {ok, pid()}). start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +%%-------------------------------------------------------------------- +%% Trace +%%-------------------------------------------------------------------- + +trace(publish, From, _Msg) when is_atom(From) -> + %% Dont' trace '$SYS' publish + ignore; +trace(publish, {ClientId, Username}, #message{topic = Topic, payload = Payload}) -> + lager:info([{client, ClientId}, {topic, Topic}], + "~s/~s PUBLISH to ~s: ~p", [ClientId, Username, Topic, Payload]); +trace(publish, From, #message{topic = Topic, payload = Payload}) + when is_binary(From); is_list(From) -> + lager:info([{client, From}, {topic, Topic}], + "~s PUBLISH to ~s: ~p", [From, Topic, Payload]). + + +%%-------------------------------------------------------------------- +%% Start/Stop Trace +%%-------------------------------------------------------------------- + %% @doc Start to trace client or topic. -spec(start_trace(trace_who(), string()) -> ok | {error, term()}). start_trace({client, ClientId}, LogFile) -> @@ -67,10 +90,10 @@ all_traces() -> gen_server:call(?MODULE, all_traces). %%-------------------------------------------------------------------- init([]) -> - {ok, #state{level = debug, traces = #{}}}. + {ok, #state{level = emqx:env(trace_level, debug), traces = #{}}}. handle_call({start_trace, Who, LogFile}, _From, State = #state{level = Level, traces = Traces}) -> - case lager:trace_file(LogFile, [Who], Level, ?TRACE_OPTIONS) of + case lager:trace_file(LogFile, [Who], Level, ?OPTIONS) of {ok, exists} -> {reply, {error, existed}, State}; {ok, Trace} -> @@ -96,15 +119,15 @@ handle_call(all_traces, _From, State = #state{traces = Traces}) -> <- maps:to_list(Traces)], State}; handle_call(Req, _From, State) -> - lager:error("[TRACE] Unexpected Call: ~p", [Req]), + emqx_log:error("[TRACE] Unexpected Call: ~p", [Req]), {reply, ignore, State}. handle_cast(Msg, State) -> - lager:error("[TRACE] Unexpected Cast: ~p", [Msg]), + emqx_log:error("[TRACE] Unexpected Cast: ~p", [Msg]), {noreply, State}. handle_info(Info, State) -> - lager:error("[TRACE] Unexpected Info: ~p", [Info]), + emqx_log:error("[TRACE] Unexpected Info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> diff --git a/src/emqx_trace_sup.erl b/src/emqx_trace_sup.erl index 861b3ef79..e5f7bc2f2 100644 --- a/src/emqx_trace_sup.erl +++ b/src/emqx_trace_sup.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_trie.erl b/src/emqx_trie.erl index ba2ead2d2..9f13ba9d2 100644 --- a/src/emqx_trie.erl +++ b/src/emqx_trie.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -18,7 +18,7 @@ -include("emqx.hrl"). -%% Mnesia Callbacks +%% Mnesia bootstrap -export([mnesia/1]). -boot_mnesia({mnesia, [boot]}). @@ -28,10 +28,10 @@ -export([insert/1, match/1, lookup/1, delete/1]). %%-------------------------------------------------------------------- -%% Mnesia Callbacks +%% Mnesia Bootstrap %%-------------------------------------------------------------------- -%% @doc Create or Replicate trie tables. +%% @doc Create or replicate trie tables. -spec(mnesia(boot | copy) -> ok). mnesia(boot) -> %% Trie Table @@ -55,8 +55,8 @@ mnesia(copy) -> %% Trie API %%-------------------------------------------------------------------- -%% @doc Insert topic to trie --spec(insert(Topic :: binary()) -> ok). +%% @doc Insert a topic into the trie +-spec(insert(Topic :: topic()) -> ok). insert(Topic) when is_binary(Topic) -> case mnesia:read(trie_node, Topic) of [#trie_node{topic = Topic}] -> @@ -64,25 +64,25 @@ insert(Topic) when is_binary(Topic) -> [TrieNode = #trie_node{topic = undefined}] -> write_trie_node(TrieNode#trie_node{topic = Topic}); [] -> - % Add trie path + %% Add trie path lists:foreach(fun add_path/1, emqx_topic:triples(Topic)), - % Add last node + %% Add last node write_trie_node(#trie_node{node_id = Topic, topic = Topic}) end. -%% @doc Find trie nodes that match topic --spec(match(Topic :: binary()) -> list(MatchedTopic :: binary())). +%% @doc Find trie nodes that match the topic +-spec(match(Topic :: topic()) -> list(MatchedTopic :: topic())). match(Topic) when is_binary(Topic) -> TrieNodes = match_node(root, emqx_topic:words(Topic)), [Name || #trie_node{topic = Name} <- TrieNodes, Name =/= undefined]. -%% @doc Lookup a Trie Node +%% @doc Lookup a trie node -spec(lookup(NodeId :: binary()) -> [#trie_node{}]). lookup(NodeId) -> mnesia:read(trie_node, NodeId). -%% @doc Delete topic from trie --spec(delete(Topic :: binary()) -> ok). +%% @doc Delete a topic from the trie +-spec(delete(Topic :: topic()) -> ok). delete(Topic) when is_binary(Topic) -> case mnesia:read(trie_node, Topic) of [#trie_node{edge_count = 0}] -> @@ -95,11 +95,11 @@ delete(Topic) when is_binary(Topic) -> end. %%-------------------------------------------------------------------- -%% Internal Functions +%% Internal functions %%-------------------------------------------------------------------- %% @private -%% @doc Add path to trie tree. +%% @doc Add a path to the trie. add_path({Node, Word, Child}) -> Edge = #trie_edge{node_id = Node, word = Word}, case mnesia:read(trie_node, Node) of @@ -146,7 +146,7 @@ match_node(NodeId, [W|Words], ResAcc) -> end. %% @private -%% @doc Delete paths from trie tree. +%% @doc Delete paths from the trie. delete_path([]) -> ok; delete_path([{NodeId, Word, _} | RestPath]) -> diff --git a/src/emqx_vm.erl b/src/emqx_vm.erl index 32959cc0c..0bf7a6c30 100644 --- a/src/emqx_vm.erl +++ b/src/emqx_vm.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_ws.erl b/src/emqx_ws.erl index e854bd1cd..21b4ffcec 100644 --- a/src/emqx_ws.erl +++ b/src/emqx_ws.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 61e717a5c..bee19841a 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_ws_conn). +-module(emqx_ws_connection). -behaviour(gen_server). diff --git a/src/emqx_ws_connection_sup.erl b/src/emqx_ws_connection_sup.erl index f1a26a91b..330272102 100644 --- a/src/emqx_ws_connection_sup.erl +++ b/src/emqx_ws_connection_sup.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_ws_conn_sup). +-module(emqx_ws_connection_sup). -behavior(supervisor). @@ -39,6 +39,6 @@ init([]) -> %%TODO: Cannot upgrade the environments, Use zone? Env = lists:append(emqx:env(client, []), emqx:env(protocol, [])), {ok, {{simple_one_for_one, 0, 1}, - [{ws_conn, {emqx_ws_conn, start_link, [Env]}, - temporary, 5000, worker, [emqx_ws_conn]}]}}. + [{ws_connection, {emqx_ws_connection, start_link, [Env]}, + temporary, 5000, worker, [emqx_ws_connection]}]}}.