diff --git a/Makefile b/Makefile index 49642f180..d42673d93 100644 --- a/Makefile +++ b/Makefile @@ -8,7 +8,7 @@ dep_gproc = git https://github.com/uwiger/gproc.git dep_lager = git https://github.com/basho/lager.git dep_gen_conf = git https://github.com/emqtt/gen_conf.git dep_gen_logger = git https://github.com/emqtt/gen_logger.git -dep_esockd = git https://github.com/emqtt/esockd.git udp +dep_esockd = git https://github.com/emqtt/esockd.git emq20 dep_mochiweb = git https://github.com/emqtt/mochiweb.git ERLC_OPTS += +'{parse_transform, lager_transform}' diff --git a/docs/source/changes.rst b/docs/source/changes.rst index 33c400101..7616f8b76 100644 --- a/docs/source/changes.rst +++ b/docs/source/changes.rst @@ -5,11 +5,44 @@ Changes ======= -.. _release_2.0: +.. _release_2.0_beta.2: -------------------------------------- -Version 2.0-beta1 (West of West Lake) -------------------------------------- +------------------ +Version 2.0-beta.2 +------------------ + +*Release Date: 2016-09-10* + +CoAP Support +------------ + +Release an experimental CoAP Gateway: https://github.com/emqtt/emqttd_coap + +API Breaking Changes +-------------------- + +'$u', '$c' variables in emqttd.conf and modules/acl.conf changed to '%u', '%c' + +Improve the design of mqtt retained message, replace emqttd_retainer with emqttd_mod_retainer. + +Add 'session.subscribed', 'session.unsubscribed' hooks, remove 'client.subscribe.after' hook + +Tab 'retained_message' -> 'mqtt_retained' + +Bugfix +------ + +[2.0 beta1] FORMAT ERROR: "~s PUBLISH to ~s: ~p" (PR #671) + +Fixing issues in cluster mode. (PR #681) + +Fixing issues with unsubscribe hook (PR #673) + +.. _release_2.0_beta.1: + +------------------ +Version 2.0-beta.1 +------------------ *Release Date: 2016-08-30* diff --git a/docs/source/design.rst b/docs/source/design.rst index adc1f10b8..8e5ea8852 100644 --- a/docs/source/design.rst +++ b/docs/source/design.rst @@ -337,10 +337,12 @@ Hooks defined by the emqttd 1.0 broker: +------------------------+------------------------------------------------------+ | client.subscribe | Run before client subscribes topics | +------------------------+------------------------------------------------------+ -| client.subscribe.after | Run After client subscribed topics | -+------------------------+------------------------------------------------------+ | client.unsubscribe | Run when client unsubscribes topics | +------------------------+------------------------------------------------------+ +| session.subscribed | Run After client(session) subscribed a topic | ++------------------------+------------------------------------------------------+ +| session.unsubscribed | Run After client(session) unsubscribed a topic | ++------------------------+------------------------------------------------------+ | message.publish | Run when a MQTT message is published | +------------------------+------------------------------------------------------+ | message.delivered | Run when a MQTT message is delivered | @@ -517,4 +519,6 @@ Mnesia/ETS Tables +--------------------+--------+----------------------------------------+ | mqtt_client | ets | Client Table | +--------------------+--------+----------------------------------------+ +| mqtt_retained | mnesia | Retained Message Table | ++--------------------+--------+----------------------------------------+ diff --git a/docs/source/plugins.rst b/docs/source/plugins.rst index ab4543e06..aecc1966d 100644 --- a/docs/source/plugins.rst +++ b/docs/source/plugins.rst @@ -812,26 +812,28 @@ Register Callbacks for Hooks The plugin could register callbacks for hooks. The hooks will be run by the broker when a client connected/disconnected, a topic subscribed/unsubscribed or a message published/delivered: -+------------------------+---------------------------------------+ -| Name | Description | -+------------------------+---------------------------------------+ -| client.connected | Run when a client connected to the | -| | broker successfully | -+------------------------+---------------------------------------+ -| client.subscribe | Run before a client subscribes topics | -+------------------------+---------------------------------------+ -| client.subscribe.after | Run after a client subscribed topics | -+------------------------+---------------------------------------+ -| client.unsubscribe | Run when a client unsubscribes topics | -+------------------------+---------------------------------------+ -| message.publish | Run when a message is published | -+------------------------+---------------------------------------+ -| message.delivered | Run when a message is delivered | -+------------------------+---------------------------------------+ -| message.acked | Run when a message(qos1/2) is acked | -+------------------------+---------------------------------------+ -| client.disconnected | Run when a client is disconnnected | -+------------------------+---------------------------------------+ ++------------------------+-----------------------------------------+ +| Name | Description | ++------------------------+-----------------------------------------+ +| client.connected | Run when a client connected to the | +| | broker successfully | ++------------------------+-----------------------------------------+ +| client.subscribe | Run before a client subscribes topics | ++------------------------+-----------------------------------------+ +| client.unsubscribe | Run when a client unsubscribes topics | ++------------------------+-----------------------------------------+ +| session.subscribed | Run after a client subscribed a topic | ++------------------------+-----------------------------------------+ +| session.unsubscribed | Run after a client unsubscribed a topic | ++------------------------+-----------------------------------------+ +| message.publish | Run when a message is published | ++------------------------+-----------------------------------------+ +| message.delivered | Run when a message is delivered | ++------------------------+-----------------------------------------+ +| message.acked | Run when a message(qos1/2) is acked | ++------------------------+-----------------------------------------+ +| client.disconnected | Run when a client is disconnnected | ++------------------------+-----------------------------------------+ emqttd_plugin_template.erl for example: @@ -841,13 +843,13 @@ emqttd_plugin_template.erl for example: load(Env) -> emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]), emqttd:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]), - emqttd:hook('client.subscribe', fun ?MODULE:on_client_subscribe/3, [Env]), - emqttd:hook('client.subscribe.after', fun ?MODULE:on_client_subscribe_after/3, [Env]), - emqttd:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/3, [Env]), + emqttd:hook('client.subscribe', fun ?MODULE:on_client_subscribe/4, [Env]), + emqttd:hook('session.subscribed', fun ?MODULE:on_session_subscribed/4, [Env]), + emqttd:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4, [Env]), + emqttd:hook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4, [Env]), emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]), - emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/3, [Env]), - emqttd:hook('message.acked', fun ?MODULE:on_message_acked/3, [Env]). - + emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/4, [Env]), + emqttd:hook('message.acked', fun ?MODULE:on_message_acked/4, [Env]). Register CLI Modules -------------------- @@ -881,10 +883,10 @@ There will be a new CLI after the plugin loaded:: .. _emqttd_dashboard: https://github.com/emqtt/emqttd_dashboard .. _emqttd_auth_ldap: https://github.com/emqtt/emqttd_auth_ldap .. _emqttd_auth_http: https://github.com/emqtt/emqttd_auth_http -.. _emqttd_auth_mysql: https://github.com/emqtt/emqttd_plugin_mysql -.. _emqttd_auth_pgsql: https://github.com/emqtt/emqttd_plugin_pgsql -.. _emqttd_auth_redis: https://github.com/emqtt/emqttd_plugin_redis -.. _emqttd_auth_mongo: https://github.com/emqtt/emqttd_plugin_mongo +.. _emqttd_auth_mysql: https://github.com/emqtt/emqttd_auth_mysql +.. _emqttd_auth_pgsql: https://github.com/emqtt/emqttd_auth_pgsql +.. _emqttd_auth_redis: https://github.com/emqtt/emqttd_auth_redis +.. _emqttd_auth_mongo: https://github.com/emqtt/emqttd_auth_mongo .. _emqttd_sn: https://github.com/emqtt/emqttd_sn .. _emqttd_stomp: https://github.com/emqtt/emqttd_stomp .. _emqttd_sockjs: https://github.com/emqtt/emqttd_sockjs diff --git a/rebar.config b/rebar.config index 5306e0ae0..aedddfbe9 100644 --- a/rebar.config +++ b/rebar.config @@ -1,4 +1,4 @@ {deps, [ -{gproc,".*",{git,"https://github.com/uwiger/gproc.git",""}},{lager,".*",{git,"https://github.com/basho/lager.git",""}},{gen_logger,".*",{git,"https://github.com/emqtt/gen_logger.git",""}},{gen_conf,".*",{git,"https://github.com/emqtt/gen_conf.git",""}},{esockd,".*",{git,"https://github.com/emqtt/esockd.git","udp"}},{mochiweb,".*",{git,"https://github.com/emqtt/mochiweb.git",""}} +{gproc,".*",{git,"https://github.com/uwiger/gproc.git",""}},{lager,".*",{git,"https://github.com/basho/lager.git",""}},{gen_logger,".*",{git,"https://github.com/emqtt/gen_logger.git",""}},{gen_conf,".*",{git,"https://github.com/emqtt/gen_conf.git",""}},{esockd,".*",{git,"https://github.com/emqtt/esockd.git","emq20"}},{mochiweb,".*",{git,"https://github.com/emqtt/mochiweb.git",""}} ]}. {erl_opts, [{parse_transform,lager_transform}]}. diff --git a/src/emqttd.erl b/src/emqttd.erl index 252d77d78..5221969de 100644 --- a/src/emqttd.erl +++ b/src/emqttd.erl @@ -62,10 +62,10 @@ start() -> application:start(?APP). %% @doc Get Config -spec(conf(Key :: atom()) -> any()). -conf(Key) -> gen_conf:value(?APP, Key). +conf(Key) -> emqttd_conf:value(Key). -spec(conf(Key :: atom(), Default :: any()) -> any()). -conf(Key, Default) -> gen_conf:value(?APP, Key, Default). +conf(Key, Default) -> emqttd_conf:value(Key, Default). %% @doc Environment -spec(env(Key:: atom()) -> any()). diff --git a/src/emqttd_access_rule.erl b/src/emqttd_access_rule.erl index 6b0e08d75..88395b182 100644 --- a/src/emqttd_access_rule.erl +++ b/src/emqttd_access_rule.erl @@ -75,8 +75,8 @@ compile(topic, Topic) -> end. 'pattern?'(Words) -> - lists:member(<<"$u">>, Words) - orelse lists:member(<<"$c">>, Words). + lists:member(<<"%u">>, Words) + orelse lists:member(<<"%c">>, Words). bin(L) when is_list(L) -> list_to_binary(L); @@ -142,13 +142,13 @@ feed_var(Client, Pattern) -> feed_var(Client, Pattern, []). feed_var(_Client, [], Acc) -> lists:reverse(Acc); -feed_var(Client = #mqtt_client{client_id = undefined}, [<<"$c">>|Words], Acc) -> - feed_var(Client, Words, [<<"$c">>|Acc]); -feed_var(Client = #mqtt_client{client_id = ClientId}, [<<"$c">>|Words], Acc) -> +feed_var(Client = #mqtt_client{client_id = undefined}, [<<"%c">>|Words], Acc) -> + feed_var(Client, Words, [<<"%c">>|Acc]); +feed_var(Client = #mqtt_client{client_id = ClientId}, [<<"%c">>|Words], Acc) -> feed_var(Client, Words, [ClientId |Acc]); -feed_var(Client = #mqtt_client{username = undefined}, [<<"$u">>|Words], Acc) -> - feed_var(Client, Words, [<<"$u">>|Acc]); -feed_var(Client = #mqtt_client{username = Username}, [<<"$u">>|Words], Acc) -> +feed_var(Client = #mqtt_client{username = undefined}, [<<"%u">>|Words], Acc) -> + feed_var(Client, Words, [<<"%u">>|Acc]); +feed_var(Client = #mqtt_client{username = Username}, [<<"%u">>|Words], Acc) -> feed_var(Client, Words, [Username|Acc]); feed_var(Client, [W|Words], Acc) -> feed_var(Client, Words, [W|Acc]). diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 2db44c351..81f1d329c 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -42,7 +42,7 @@ Reason :: term()). start(_StartType, _StartArgs) -> print_banner(), - gen_conf:init(emqttd), + emqttd_conf:init(), emqttd_mnesia:start(), {ok, Sup} = emqttd_sup:start_link(), start_servers(Sup), @@ -81,7 +81,6 @@ start_servers(Sup) -> {"emqttd pubsub", {supervisor, emqttd_pubsub_sup}}, {"emqttd stats", emqttd_stats}, {"emqttd metrics", emqttd_metrics}, - {"emqttd retainer", emqttd_retainer}, {"emqttd pooler", {supervisor, emqttd_pooler}}, {"emqttd trace", {supervisor, emqttd_trace_sup}}, {"emqttd client manager", {supervisor, emqttd_cm_sup}}, diff --git a/src/emqttd_cli.erl b/src/emqttd_cli.erl index f346be653..a6e8e10ef 100644 --- a/src/emqttd_cli.erl +++ b/src/emqttd_cli.erl @@ -308,7 +308,7 @@ plugins(_) -> %%-------------------------------------------------------------------- %% @doc Bridges command bridges(["list"]) -> - foreach(fun({{Node, Topic}, _Pid}) -> + foreach(fun({Node, Topic, _Pid}) -> ?PRINT("bridge: ~s--~s-->~s~n", [node(), Topic, Node]) end, emqttd_bridge_sup_sup:bridges()); diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index 7b7dedcf9..12bd2942a 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -140,14 +140,14 @@ handle_call(Req, _From, State) -> ?UNEXPECTED_REQ(Req, State). handle_cast({subscribe, TopicTable}, State) -> - with_session(fun(SessPid) -> - emqttd_session:subscribe(SessPid, TopicTable) - end, State); + with_proto_state(fun(ProtoState) -> + emqttd_protocol:handle({subscribe, TopicTable}, ProtoState) + end, State); handle_cast({unsubscribe, Topics}, State) -> - with_session(fun(SessPid) -> - emqttd_session:unsubscribe(SessPid, Topics) - end, State); + with_proto_state(fun(ProtoState) -> + emqttd_protocol:handle({unsubscribe, Topics}, ProtoState) + end, State); handle_cast(Msg, State) -> ?UNEXPECTED_MSG(Msg, State). @@ -249,10 +249,6 @@ with_proto_state(Fun, State = #client_state{proto_state = ProtoState}) -> {ok, ProtoState1} = Fun(ProtoState), hibernate(State#client_state{proto_state = ProtoState1}). -with_session(Fun, State = #client_state{proto_state = ProtoState}) -> - Fun(emqttd_protocol:session(ProtoState)), - hibernate(State). - %% Receive and parse tcp data received(<<>>, State) -> hibernate(State); diff --git a/src/emqttd_conf.erl b/src/emqttd_conf.erl index c4a78fd9b..b3677d6b4 100644 --- a/src/emqttd_conf.erl +++ b/src/emqttd_conf.erl @@ -16,84 +16,97 @@ -module(emqttd_conf). --export([mqtt/0, retained/0, session/0, queue/0, bridge/0, pubsub/0]). +-export([init/0]). + +-export([mqtt/0, session/0, queue/0, bridge/0, pubsub/0]). + +-export([value/1, value/2, list/1]). + +-define(APP, emqttd). + +init() -> gen_conf:init(?APP). mqtt() -> - [ - %% Max ClientId Length Allowed. - {max_clientid_len, emqttd:conf(mqtt_max_clientid_len, 512)}, - %% Max Packet Size Allowed, 64K by default. - {max_packet_size, emqttd:conf(mqtt_max_packet_size, 65536)}, - %% Client Idle Timeout. - {client_idle_timeout, emqttd:conf(mqtt_client_idle_timeout, 30)} - ]. - -retained() -> - [ - %% Expired after seconds, never expired if 0 - {expired_after, emqttd:conf(retained_expired_after, 0)}, - %% Max number of retained messages - {max_message_num, emqttd:conf(retained_max_message_num, 100000)}, - %% Max Payload Size of retained message - {max_playload_size, emqttd:conf(retained_max_playload_size, 65536)} - ]. + with_env(mqtt_protocol, [ + %% Max ClientId Length Allowed. + {max_clientid_len, value(mqtt_max_clientid_len, 512)}, + %% Max Packet Size Allowed, 64K by default. + {max_packet_size, value(mqtt_max_packet_size, 65536)}, + %% Client Idle Timeout. + {client_idle_timeout, value(mqtt_client_idle_timeout, 30)} + ]). session() -> - [ - %% Max number of QoS 1 and 2 messages that can be “inflight” at one time. - %% 0 means no limit - {max_inflight, emqttd:conf(session_max_inflight, 100)}, + with_env(mqtt_session, [ + %% Max number of QoS 1 and 2 messages that can be “inflight” at one time. + %% 0 means no limit + {max_inflight, value(session_max_inflight, 100)}, - %% Retry interval for redelivering QoS1/2 messages. - {unack_retry_interval, emqttd:conf(session_unack_retry_interval, 60)}, + %% Retry interval for redelivering QoS1/2 messages. + {unack_retry_interval, value(session_unack_retry_interval, 60)}, - %% Awaiting PUBREL Timeout - {await_rel_timeout, emqttd:conf(session_await_rel_timeout, 20)}, + %% Awaiting PUBREL Timeout + {await_rel_timeout, value(session_await_rel_timeout, 20)}, - %% Max Packets that Awaiting PUBREL, 0 means no limit - {max_awaiting_rel, emqttd:conf(session_max_awaiting_rel, 0)}, + %% Max Packets that Awaiting PUBREL, 0 means no limit + {max_awaiting_rel, value(session_max_awaiting_rel, 0)}, - %% Statistics Collection Interval(seconds) - {collect_interval, emqttd:conf(session_collect_interval, 0)}, + %% Statistics Collection Interval(seconds) + {collect_interval, value(session_collect_interval, 0)}, - %% Expired after 2 day (unit: minute) - {expired_after, emqttd:conf(session_expired_after, 2880)} - ]. + %% Expired after 2 day (unit: minute) + {expired_after, value(session_expired_after, 2880)} + ]). queue() -> - [ - %% Type: simple | priority - {type, emqttd:conf(queue_type, simple)}, + with_env(mqtt_queue, [ + %% Type: simple | priority + {type, value(queue_type, simple)}, - %% Topic Priority: 0~255, Default is 0 - {priority, emqttd:conf(queue_priority, [])}, + %% Topic Priority: 0~255, Default is 0 + {priority, value(queue_priority, [])}, - %% Max queue length. Enqueued messages when persistent client disconnected, - %% or inflight window is full. - {max_length, emqttd:conf(queue_max_length, infinity)}, + %% Max queue length. Enqueued messages when persistent client disconnected, + %% or inflight window is full. + {max_length, value(queue_max_length, infinity)}, - %% Low-water mark of queued messages - {low_watermark, emqttd:conf(queue_low_watermark, 0.2)}, + %% Low-water mark of queued messages + {low_watermark, value(queue_low_watermark, 0.2)}, - %% High-water mark of queued messages - {high_watermark, emqttd:conf(queue_high_watermark, 0.6)}, + %% High-water mark of queued messages + {high_watermark, value(queue_high_watermark, 0.6)}, - %% Queue Qos0 messages? - {queue_qos0, emqttd:conf(queue_qos0, true)} - ]. + %% Queue Qos0 messages? + {queue_qos0, value(queue_qos0, true)} + ]). bridge() -> - [ - %% TODO: Bridge Queue Size - {max_queue_len, emqttd:conf(bridge_max_queue_len, 10000)}, + with_env(mqtt_bridge, [ + {max_queue_len, value(bridge_max_queue_len, 10000)}, - %% Ping Interval of bridge node - {ping_down_interval, emqttd:conf(bridge_ping_down_interval, 1)} - ]. + %% Ping Interval of bridge node + {ping_down_interval, value(bridge_ping_down_interval, 1)} + ]). pubsub() -> - [ - %% PubSub and Router. Default should be scheduler numbers. - {pool_size, emqttd:conf(pubsub_pool_size, 8)} - ]. + with_env(mqtt_pubsub, [ + %% PubSub and Router. Default should be scheduler numbers. + {pool_size, value(pubsub_pool_size, 8)} + ]). + +value(Key) -> + with_env(Key, gen_conf:value(?APP, Key)). + +value(Key, Default) -> + with_env(Key, gen_conf:value(?APP, Key, Default)). + +with_env(Key, Conf) -> + case application:get_env(?APP, Key) of + undefined -> + application:set_env(?APP, Key, Conf), Conf; + {ok, Val} -> + Val + end. + +list(Key) -> gen_conf:list(?APP, Key). diff --git a/src/emqttd_mod_presence.erl b/src/emqttd_mod_presence.erl index 7815e88be..29fe2915d 100644 --- a/src/emqttd_mod_presence.erl +++ b/src/emqttd_mod_presence.erl @@ -14,7 +14,6 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc emqttd presence management module -module(emqttd_mod_presence). -behaviour(emqttd_gen_mod). diff --git a/src/emqttd_retainer.erl b/src/emqttd_mod_retainer.erl similarity index 58% rename from src/emqttd_retainer.erl rename to src/emqttd_mod_retainer.erl index 435bd7abc..5742a264c 100644 --- a/src/emqttd_retainer.erl +++ b/src/emqttd_mod_retainer.erl @@ -14,115 +14,119 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc MQTT retained message. --module(emqttd_retainer). +-module(emqttd_mod_retainer). -behaviour(gen_server). +-behaviour(emqttd_gen_mod). + -include("emqttd.hrl"). -include("emqttd_internal.hrl"). -include_lib("stdlib/include/ms_transform.hrl"). -%% Mnesia Callbacks --export([mnesia/1]). +%% gen_mod Callbacks +-export([load/1, unload/1]). --boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). +%% Hook Callbacks +-export([on_session_subscribed/4, on_message_publish/2]). %% API Function Exports --export([retain/1, read_messages/1, dispatch/2]). - -%% API Function Exports --export([start_link/0]). +-export([start_link/1]). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(retained_message, {topic, msg}). +-record(mqtt_retained, {topic, msg}). -record(state, {stats_fun, expired_after, stats_timer, expire_timer}). %%-------------------------------------------------------------------- -%% Mnesia callbacks +%% Load/Unload %%-------------------------------------------------------------------- -mnesia(boot) -> - ok = emqttd_mnesia:create_table(retained_message, [ - {type, ordered_set}, - {disc_copies, [node()]}, - {record_name, retained_message}, - {attributes, record_info(fields, retained_message)}, - {storage_properties, [{ets, [compressed]}, - {dets, [{auto_save, 1000}]}]}]); +load(Env) -> + emqttd_mod_sup:start_child(spec(Env)), + emqttd:hook('session.subscribed', fun ?MODULE:on_session_subscribed/4, [Env]), + emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]). -mnesia(copy) -> - ok = emqttd_mnesia:copy_table(retained_message). +on_session_subscribed(_ClientId, _Username, {Topic, _Opts}, _Env) -> + SessPid = self(), + Msgs = case emqttd_topic:wildcard(Topic) of + false -> read_messages(Topic); + true -> match_messages(Topic) + end, + lists:foreach(fun(Msg) -> SessPid ! {dispatch, Topic, Msg} end, lists:reverse(Msgs)). + +on_message_publish(Msg = #mqtt_message{retain = false}, _Env) -> + {ok, Msg}; + +%% RETAIN flag set to 1 and payload containing zero bytes +on_message_publish(Msg = #mqtt_message{retain = true, topic = Topic, payload = <<>>}, _Env) -> + mnesia:dirty_delete(mqtt_retained, Topic), + {stop, Msg}; + +on_message_publish(Msg = #mqtt_message{topic = Topic, retain = true, payload = Payload}, Env) -> + case {is_table_full(Env), is_too_big(size(Payload), Env)} of + {false, false} -> + mnesia:dirty_write(#mqtt_retained{topic = Topic, msg = Msg}), + emqttd_metrics:set('messages/retained', retained_count()); + {true, _}-> + lager:error("Cannot retain message(topic=~s) for table is full!", [Topic]); + {_, true}-> + lager:error("Cannot retain message(topic=~s, payload_size=~p)" + " for payload is too big!", [Topic, size(Payload)]) + end, + {ok, Msg#mqtt_message{retain = false}}. + +is_table_full(Env) -> + Limit = proplists:get_value(max_message_num, Env, 0), + Limit > 0 andalso (retained_count() > Limit). + +is_too_big(Size, Env) -> + Limit = proplists:get_value(max_payload_size, Env, 0), + Limit > 0 andalso (Size > Limit). + +unload(_Env) -> + emqttd:unhook('session.subscribed', fun ?MODULE:on_session_subscribed/4), + emqttd:unhook('message.publish', fun ?MODULE:on_message_publish/2), + emqttd_mod_sup:stop_child(?MODULE). + +spec(Env) -> + {?MODULE, {?MODULE, start_link, [Env]}, permanent, 5000, worker, [?MODULE]}. %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- %% @doc Start the retainer --spec(start_link() -> {ok, pid()} | ignore | {error, any()}). -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +-spec(start_link(Env :: list()) -> {ok, pid()} | ignore | {error, any()}). +start_link(Env) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [Env], []). -%% @doc Retain a message --spec(retain(mqtt_message()) -> ok | ignore). -retain(#mqtt_message{retain = false}) -> ignore; +%%-------------------------------------------------------------------- +%% gen_server Callbacks +%%-------------------------------------------------------------------- -%% RETAIN flag set to 1 and payload containing zero bytes -retain(#mqtt_message{retain = true, topic = Topic, payload = <<>>}) -> - delete_message(Topic); - -retain(Msg = #mqtt_message{topic = Topic, retain = true, payload = Payload}) -> - TabSize = retained_count(), - case {TabSize < limit(table), size(Payload) < limit(payload)} of - {true, true} -> - retain_message(Msg), - emqttd_metrics:set('messages/retained', retained_count()); - {false, _}-> - lager:error("Cannot retain message(topic=~s) for table is full!", [Topic]); - {_, false}-> - lager:error("Cannot retain message(topic=~s, payload_size=~p)" - " for payload is too big!", [Topic, size(Payload)]) - end, ok. - -limit(table) -> env(max_message_num); -limit(payload) -> env(max_playload_size). - -env(Key) -> - case get({retained, Key}) of - undefined -> - Env = emqttd_conf:retained(), - Val = proplists:get_value(Key, Env), - put({retained, Key}, Val), Val; - Val -> - Val - end. - -%% @doc Deliver retained messages to the subscriber --spec(dispatch(Topic :: binary(), CPid :: pid()) -> any()). -dispatch(Topic, CPid) when is_binary(Topic) -> - Msgs = case emqttd_topic:wildcard(Topic) of - false -> read_messages(Topic); - true -> match_messages(Topic) +init([Env]) -> + Copy = case proplists:get_value(storage_type, Env, disc) of + disc -> disc_copies; + ram -> ram_copies end, - lists:foreach(fun(Msg) -> CPid ! {dispatch, Topic, Msg} end, lists:reverse(Msgs)). - -%%-------------------------------------------------------------------- -%% gen_server callbacks -%%-------------------------------------------------------------------- - -init([]) -> + ok = emqttd_mnesia:create_table(mqtt_retained, [ + {type, ordered_set}, + {Copy, [node()]}, + {record_name, mqtt_retained}, + {attributes, record_info(fields, mqtt_retained)}, + {storage_properties, [{ets, [compressed]}, + {dets, [{auto_save, 1000}]}]}]), + ok = emqttd_mnesia:copy_table(mqtt_retained), StatsFun = emqttd_stats:statsfun('retained/count', 'retained/max'), - %% One second {ok, StatsTimer} = timer:send_interval(timer:seconds(1), stats), State = #state{stats_fun = StatsFun, stats_timer = StatsTimer}, - {ok, init_expire_timer(env(expired_after), State)}. + {ok, init_expire_timer(proplists:get_value(expired_after, Env, 0), State)}. init_expire_timer(0, State) -> State; @@ -164,43 +168,35 @@ code_change(_OldVsn, State, _Extra) -> %% Internal Functions %%-------------------------------------------------------------------- --spec(retain_message(mqtt_message()) -> ok). -retain_message(Msg = #mqtt_message{topic = Topic}) -> - mnesia:dirty_write(#retained_message{topic = Topic, msg = Msg}). - -spec(read_messages(binary()) -> [mqtt_message()]). read_messages(Topic) -> - [Msg || #retained_message{msg = Msg} <- mnesia:dirty_read(retained_message, Topic)]. + [Msg || #mqtt_retained{msg = Msg} <- mnesia:dirty_read(mqtt_retained, Topic)]. -spec(match_messages(binary()) -> [mqtt_message()]). match_messages(Filter) -> %% TODO: optimize later... - Fun = fun(#retained_message{topic = Name, msg = Msg}, Acc) -> + Fun = fun(#mqtt_retained{topic = Name, msg = Msg}, Acc) -> case emqttd_topic:match(Name, Filter) of true -> [Msg|Acc]; false -> Acc end end, - mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], retained_message]). - --spec(delete_message(binary()) -> ok). -delete_message(Topic) -> - mnesia:dirty_delete(retained_message, Topic). + mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], mqtt_retained]). -spec(expire_messages(pos_integer()) -> any()). expire_messages(Time) when is_integer(Time) -> mnesia:transaction( fun() -> Match = ets:fun2ms( - fun(#retained_message{topic = Topic, msg = #mqtt_message{timestamp = Ts}}) + fun(#mqtt_retained{topic = Topic, msg = #mqtt_message{timestamp = Ts}}) when Time > Ts -> Topic end), - Topics = mnesia:select(retained_message, Match, write), + Topics = mnesia:select(mqtt_retained, Match, write), lists:foreach(fun(<<"$SYS/", _/binary>>) -> ok; %% ignore $SYS/# messages - (Topic) -> mnesia:delete({retained_message, Topic}) + (Topic) -> mnesia:delete({mqtt_retained, Topic}) end, Topics) end). -spec(retained_count() -> non_neg_integer()). -retained_count() -> mnesia:table_info(retained_message, size). +retained_count() -> mnesia:table_info(mqtt_retained, size). diff --git a/src/emqttd_mod_rewrite.erl b/src/emqttd_mod_rewrite.erl index 653fb49c7..df44ab96f 100644 --- a/src/emqttd_mod_rewrite.erl +++ b/src/emqttd_mod_rewrite.erl @@ -14,7 +14,6 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc emqttd rewrite module -module(emqttd_mod_rewrite). -behaviour(emqttd_gen_mod). @@ -23,7 +22,7 @@ -export([load/1, reload/1, unload/1]). --export([rewrite_subscribe/3, rewrite_unsubscribe/3, rewrite_publish/2]). +-export([rewrite_subscribe/4, rewrite_unsubscribe/4, rewrite_publish/2]). %%-------------------------------------------------------------------- %% API @@ -35,18 +34,18 @@ load(Opts) -> ok; File -> {ok, Terms} = file:consult(File), Sections = compile(Terms), - emqttd:hook('client.subscribe', fun ?MODULE:rewrite_subscribe/3, [Sections]), - emqttd:hook('client.unsubscribe', fun ?MODULE:rewrite_unsubscribe/3, [Sections]), + emqttd:hook('client.subscribe', fun ?MODULE:rewrite_subscribe/4, [Sections]), + emqttd:hook('client.unsubscribe', fun ?MODULE:rewrite_unsubscribe/4, [Sections]), emqttd:hook('message.publish', fun ?MODULE:rewrite_publish/2, [Sections]) end. -rewrite_subscribe({_ClientId, _Username}, {Topic, Opts}, Sections) -> - lager:info("Rewrite subscribe: ~p", [{Topic, Opts}]), - {ok, {match_topic(Topic, Sections), Opts}}. +rewrite_subscribe(_ClientId, _Username, TopicTable, Sections) -> + lager:info("Rewrite subscribe: ~p", [TopicTable]), + {ok, [{match_topic(Topic, Sections), Opts} || {Topic, Opts} <- TopicTable]}. -rewrite_unsubscribe({_ClientId, _Username}, Topic, Sections) -> - lager:info("Rewrite unsubscribe: ~p", [Topic]), - {ok, match_topic(Topic, Sections)}. +rewrite_unsubscribe(_ClientId, _Username, TopicTable, Sections) -> + lager:info("Rewrite unsubscribe: ~p", [TopicTable]), + {ok, [{match_topic(Topic, Sections), Opts} || {Topic, Opts} <- TopicTable]}. rewrite_publish(Message=#mqtt_message{topic = Topic}, Sections) -> %%TODO: this will not work if the client is always online. @@ -71,8 +70,8 @@ reload(File) -> end. unload(_) -> - emqttd:unhook('client.subscribe', fun ?MODULE:rewrite_subscribe/3), - emqttd:unhook('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/3), + emqttd:unhook('client.subscribe', fun ?MODULE:rewrite_subscribe/4), + emqttd:unhook('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/4), emqttd:unhook('message.publish', fun ?MODULE:rewrite_publish/2). %%-------------------------------------------------------------------- diff --git a/src/emqttd_mod_subscription.erl b/src/emqttd_mod_subscription.erl index a545dcfaf..904e29084 100644 --- a/src/emqttd_mod_subscription.erl +++ b/src/emqttd_mod_subscription.erl @@ -14,7 +14,6 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc Subscription from Broker Side -module(emqttd_mod_subscription). -behaviour(emqttd_gen_mod). @@ -33,7 +32,7 @@ on_client_connected(?CONNACK_ACCEPT, Client = #mqtt_client{client_id = ClientId client_pid = ClientPid, username = Username}, Topics) -> - Replace = fun(Topic) -> rep(<<"$u">>, Username, rep(<<"$c">>, ClientId, Topic)) end, + Replace = fun(Topic) -> rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic)) end, TopicTable = [{Replace(Topic), Qos} || {Topic, Qos} <- Topics], emqttd_client:subscribe(ClientPid, TopicTable), {ok, Client}; @@ -44,10 +43,10 @@ on_client_connected(_ConnAck, _Client, _State) -> unload(_Opts) -> emqttd:unhook('client.connected', fun ?MODULE:on_client_connected/3). -rep(<<"$c">>, ClientId, Topic) -> - emqttd_topic:feed_var(<<"$c">>, ClientId, Topic); -rep(<<"$u">>, undefined, Topic) -> +rep(<<"%c">>, ClientId, Topic) -> + emqttd_topic:feed_var(<<"%c">>, ClientId, Topic); +rep(<<"%u">>, undefined, Topic) -> Topic; -rep(<<"$u">>, Username, Topic) -> - emqttd_topic:feed_var(<<"$u">>, Username, Topic). +rep(<<"%u">>, Username, Topic) -> + emqttd_topic:feed_var(<<"%u">>, Username, Topic). diff --git a/src/emqttd_mod_sup.erl b/src/emqttd_mod_sup.erl index 759886fd3..a0f3cb9d3 100644 --- a/src/emqttd_mod_sup.erl +++ b/src/emqttd_mod_sup.erl @@ -21,7 +21,7 @@ -include("emqttd.hrl"). %% API --export([start_link/0, start_child/1, start_child/2]). +-export([start_link/0, start_child/1, start_child/2, stop_child/1]). %% Supervisor callbacks -export([init/1]). @@ -46,6 +46,13 @@ start_child(ChildSpec) when is_tuple(ChildSpec) -> start_child(Mod, Type) when is_atom(Mod) and is_atom(Type) -> supervisor:start_child(?MODULE, ?CHILD(Mod, Type)). +-spec(stop_child(any()) -> ok | {error, any()}). +stop_child(ChildId) -> + case supervisor:terminate_child(?MODULE, ChildId) of + ok -> supervisor:delete_child(?MODULE, ChildId); + Error -> Error + end. + %%-------------------------------------------------------------------- %% Supervisor callbacks %%-------------------------------------------------------------------- diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index 3448ce387..81593a7e5 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -14,7 +14,6 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc MQTT Protocol Processor. -module(emqttd_protocol). -include("emqttd.hrl"). @@ -28,7 +27,7 @@ %% API -export([init/3, info/1, clientid/1, client/1, session/1]). --export([received/2, send/2, redeliver/2, shutdown/2]). +-export([received/2, handle/2, send/2, redeliver/2, shutdown/2]). -export([process/2]). @@ -116,6 +115,26 @@ received(Packet = ?PACKET(_Type), State) -> {error, Reason, State} end. +handle({subscribe, RawTopicTable}, ProtoState = #proto_state{client_id = ClientId, + username = Username, + session = Session}) -> + TopicTable = parse_topic_table(RawTopicTable), + case emqttd:run_hooks('client.subscribe', [ClientId, Username], TopicTable) of + {ok, TopicTable1} -> + emqttd_session:subscribe(Session, TopicTable1); + {stop, _} -> + ok + end, + {ok, ProtoState}; + +handle({unsubscribe, RawTopics}, ProtoState = #proto_state{client_id = ClientId, + username = Username, + session = Session}) -> + {ok, TopicTable} = emqttd:run_hooks('client.unsubscribe', + [ClientId, Username], parse_topics(RawTopics)), + emqttd_session:unsubscribe(Session, TopicTable), + {ok, ProtoState}. + process(Packet = ?CONNECT_PACKET(Var), State0) -> #mqtt_packet_connect{proto_ver = ProtoVer, @@ -197,23 +216,32 @@ process(?PUBACK_PACKET(?PUBCOMP, PacketId), State = #proto_state{session = Sessi process(?SUBSCRIBE_PACKET(PacketId, []), State) -> send(?SUBACK_PACKET(PacketId, []), State); -process(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{session = Session}) -> +process(?SUBSCRIBE_PACKET(PacketId, RawTopicTable), State = #proto_state{ + client_id = ClientId, username = Username, session = Session}) -> Client = client(State), - AllowDenies = [check_acl(subscribe, Topic, Client) || {Topic, _Qos} <- TopicTable], + TopicTable = parse_topic_table(RawTopicTable), + AllowDenies = [check_acl(subscribe, Topic, Client) || {Topic, _Opts} <- TopicTable], case lists:member(deny, AllowDenies) of true -> ?LOG(error, "Cannot SUBSCRIBE ~p for ACL Deny", [TopicTable], State), send(?SUBACK_PACKET(PacketId, [16#80 || _ <- TopicTable]), State); false -> - emqttd_session:subscribe(Session, PacketId, TopicTable), {ok, State} + case emqttd:run_hooks('client.subscribe', [ClientId, Username], TopicTable) of + {ok, TopicTable1} -> + emqttd_session:subscribe(Session, PacketId, TopicTable1), {ok, State}; + {stop, _} -> + {ok, State} + end end; %% Protect from empty topic list process(?UNSUBSCRIBE_PACKET(PacketId, []), State) -> send(?UNSUBACK_PACKET(PacketId), State); -process(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) -> - emqttd_session:unsubscribe(Session, Topics), +process(?UNSUBSCRIBE_PACKET(PacketId, RawTopics), State = #proto_state{ + client_id = ClientId, username = Username, session = Session}) -> + {ok, TopicTable} = emqttd:run_hooks('client.unsubscribe', [ClientId, Username], parse_topics(RawTopics)), + emqttd_session:unsubscribe(Session, TopicTable), send(?UNSUBACK_PACKET(PacketId), State); process(?PACKET(?PINGREQ), State) -> @@ -249,7 +277,7 @@ with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId), -spec(send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}). send(Msg, State = #proto_state{client_id = ClientId, username = Username}) when is_record(Msg, mqtt_message) -> - emqttd:run_hooks('message.delivered', [{ClientId, Username}], Msg), + emqttd:run_hooks('message.delivered', [ClientId, Username], Msg), send(emqttd_message:to_packet(Msg), State); send(Packet, State = #proto_state{sendfun = SendFun}) @@ -393,6 +421,15 @@ validate_qos(Qos) when ?IS_QOS(Qos) -> validate_qos(_) -> false. +parse_topic_table(TopicTable) -> + lists:map(fun({Topic0, Qos}) -> + {Topic, Opts} = emqttd_topic:parse(Topic0), + {Topic, [{qos, Qos}|Opts]} + end, TopicTable). + +parse_topics(Topics) -> + [emqttd_topic:parse(Topic) || Topic <- Topics]. + %% PUBLISH ACL is cached in process dictionary. check_acl(publish, Topic, Client) -> IfCache = emqttd:conf(cache_acl, true), @@ -412,4 +449,3 @@ check_acl(subscribe, Topic, Client) -> sp(true) -> 1; sp(false) -> 0. - diff --git a/src/emqttd_server.erl b/src/emqttd_server.erl index 6e6c8620f..7cf4c3007 100644 --- a/src/emqttd_server.erl +++ b/src/emqttd_server.erl @@ -91,12 +91,7 @@ publish(Msg = #mqtt_message{from = From}) -> trace(publish, From, Msg), case emqttd_hook:run('message.publish', [], Msg) of {ok, Msg1 = #mqtt_message{topic = Topic}} -> - %% Retain message first. Don't create retained topic. - Msg2 = case emqttd_retainer:retain(Msg1) of - ok -> emqttd_message:unset_flag(Msg1); - ignore -> Msg1 - end, - emqttd_pubsub:publish(Topic, Msg2); + emqttd_pubsub:publish(Topic, Msg1); {stop, Msg1} -> lager:warning("Stop publishing: ~s", [emqttd_message:format(Msg1)]), ignore diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index be2ac3611..85701e249 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -84,7 +84,7 @@ packet_id = 1, %% Client’s subscriptions. - subscriptions :: dict:dict(), + subscriptions :: map(), %% Inflight qos1, qos2 messages sent to the client but unacked, %% QoS 1 and QoS 2 messages which have been sent to the Client, @@ -162,16 +162,14 @@ destroy(SessPid, ClientId) -> %%-------------------------------------------------------------------- %% @doc Subscribe Topics --spec(subscribe(pid(), [{binary(), mqtt_qos()}]) -> ok). +-spec(subscribe(pid(), [{binary(), [emqttd_topic:option()]}]) -> ok). subscribe(SessPid, TopicTable) -> gen_server2:cast(SessPid, {subscribe, TopicTable, fun(_) -> ok end}). --spec(subscribe(pid(), mqtt_packet_id(), [{binary(), mqtt_qos()}]) -> ok). -subscribe(SessPid, PacketId, TopicTable) -> - From = self(), - AckFun = fun(GrantedQos) -> - From ! {suback, PacketId, GrantedQos} - end, +-spec(subscribe(pid(), mqtt_pktid(), [{binary(), [emqttd_topic:option()]}]) -> ok). +subscribe(SessPid, PktId, TopicTable) -> + From = self(), + AckFun = fun(GrantedQos) -> From ! {suback, PktId, GrantedQos} end, gen_server2:cast(SessPid, {subscribe, TopicTable, AckFun}). %% @doc Publish message @@ -206,9 +204,9 @@ pubcomp(SessPid, PktId) -> gen_server2:cast(SessPid, {pubcomp, PktId}). %% @doc Unsubscribe Topics --spec(unsubscribe(pid(), [binary()]) -> ok). -unsubscribe(SessPid, Topics) -> - gen_server2:cast(SessPid, {unsubscribe, Topics}). +-spec(unsubscribe(pid(), [{binary(), [emqttd_topic:option()]}]) -> ok). +unsubscribe(SessPid, TopicTable) -> + gen_server2:cast(SessPid, {unsubscribe, TopicTable}). %%-------------------------------------------------------------------- %% gen_server Callbacks @@ -223,7 +221,7 @@ init([CleanSess, {ClientId, Username}, ClientPid]) -> client_id = ClientId, client_pid = ClientPid, username = Username, - subscriptions = dict:new(), + subscriptions = #{}, inflight_queue = [], max_inflight = get_value(max_inflight, SessEnv, 0), message_queue = emqttd_mqueue:new(ClientId, emqttd_conf:queue(), emqttd_alarm:alarm_fun()), @@ -250,10 +248,10 @@ prioritise_cast(Msg, _Len, _State) -> case Msg of {destroy, _} -> 10; {resume, _, _} -> 9; - {pubrel, _PktId} -> 8; - {pubcomp, _PktId} -> 8; - {pubrec, _PktId} -> 8; - {puback, _PktId} -> 7; + {pubrel, _} -> 8; + {pubcomp, _} -> 8; + {pubrec, _} -> 8; + {puback, _} -> 7; {unsubscribe, _, _} -> 6; {subscribe, _, _} -> 5; _ -> 0 @@ -288,67 +286,48 @@ handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}}, handle_call(Req, _From, State) -> ?UNEXPECTED_REQ(Req, State). -%%TODO: 2.0 FIX - handle_cast({subscribe, TopicTable, AckFun}, Session = #session{client_id = ClientId, username = Username, subscriptions = Subscriptions}) -> ?LOG(info, "Subscribe ~p", [TopicTable], Session), {GrantedQos, Subscriptions1} = - lists:foldl(fun({RawTopic, Qos}, {QosAcc, SubDict}) -> - {Topic, Opts} = emqttd_topic:strip(RawTopic), - case emqttd:run_hooks('client.subscribe', [{ClientId, Username}], {Topic, Opts}) of - {ok, {Topic1, Opts1}} -> - NewQos = proplists:get_value(qos, Opts1, Qos), - {[NewQos | QosAcc], case dict:find(Topic, SubDict) of - {ok, NewQos} -> - ?LOG(warning, "duplicated subscribe: ~s, qos = ~w", [Topic, NewQos], Session), - SubDict; - {ok, OldQos} -> - emqttd:setqos(Topic, ClientId, NewQos), - ?LOG(warning, "duplicated subscribe ~s, old_qos=~w, new_qos=~w", [Topic, OldQos, NewQos], Session), - dict:store(Topic, NewQos, SubDict); - error -> - emqttd:subscribe(Topic1, ClientId, Opts1), - %%TODO: the design is ugly... - %% : 3.8.4 - %% Where the Topic Filter is not identical to any existing Subscription’s filter, - %% a new Subscription is created and all matching retained messages are sent. - emqttd_retainer:dispatch(Topic1, self()), - emqttd:run_hooks('client.subscribe.after', [{ClientId, Username}], {Topic1, Opts1}), - - dict:store(Topic1, NewQos, SubDict) - end}; - {stop, _} -> - ?LOG(error, "Cannot subscribe: ~p", [Topic], Session), - {[128 | QosAcc], SubDict} - end - end, {[], Subscriptions}, TopicTable), + lists:foldl(fun({Topic, Opts}, {QosAcc, SubMap}) -> + NewQos = proplists:get_value(qos, Opts), + SubMap1 = + case maps:find(Topic, SubMap) of + {ok, NewQos} -> + ?LOG(warning, "duplicated subscribe: ~s, qos = ~w", [Topic, NewQos], Session), + SubMap; + {ok, OldQos} -> + emqttd:setqos(Topic, ClientId, NewQos), + ?LOG(warning, "duplicated subscribe ~s, old_qos=~w, new_qos=~w", + [Topic, OldQos, NewQos], Session), + maps:put(Topic, NewQos, SubMap); + error -> + emqttd:subscribe(Topic, ClientId, Opts), + emqttd:run_hooks('session.subscribed', [ClientId, Username], {Topic, Opts}), + maps:put(Topic, NewQos, SubMap) + end, + {[NewQos|QosAcc], SubMap1} + end, {[], Subscriptions}, TopicTable), AckFun(lists:reverse(GrantedQos)), hibernate(Session#session{subscriptions = Subscriptions1}); -%%TODO: 2.0 FIX - -handle_cast({unsubscribe, Topics}, Session = #session{client_id = ClientId, - username = Username, - subscriptions = Subscriptions}) -> - ?LOG(info, "unsubscribe ~p", [Topics], Session), +handle_cast({unsubscribe, TopicTable}, Session = #session{client_id = ClientId, + username = Username, + subscriptions = Subscriptions}) -> + ?LOG(info, "unsubscribe ~p", [TopicTable], Session), Subscriptions1 = - lists:foldl(fun(RawTopic, SubDict) -> - {Topic0, _Opts} = emqttd_topic:strip(RawTopic), - case emqttd:run_hooks('client.unsubscribe', [{ClientId, Username}], Topic0) of - {ok, Topic1} -> - case dict:find(Topic1, SubDict) of - {ok, _Qos} -> - emqttd:unsubscribe(Topic1, ClientId), - dict:erase(Topic1, SubDict); - error -> - SubDict - end; - {stop, _} -> - SubDict - end - end, Subscriptions, Topics), + lists:foldl(fun({Topic, Opts}, SubMap) -> + case maps:find(Topic, SubMap) of + {ok, _Qos} -> + emqttd:unsubscribe(Topic, ClientId), + emqttd:run_hooks('session.unsubscribed', [ClientId, Username], {Topic, Opts}), + maps:remove(Topic, SubMap); + error -> + SubMap + end + end, Subscriptions, TopicTable), hibernate(Session#session{subscriptions = Subscriptions1}); handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId}) -> @@ -582,8 +561,8 @@ dispatch(Msg = #mqtt_message{qos = QoS}, Session = #session{message_queue = MsgQ hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)}) end. -tune_qos(Topic, Msg = #mqtt_message{qos = PubQos}, Subscriptions) -> - case dict:find(Topic, Subscriptions) of +tune_qos(Topic, Msg = #mqtt_message{qos = PubQos}, SubMap) -> + case maps:find(Topic, SubMap) of {ok, SubQos} when PubQos > SubQos -> Msg#mqtt_message{qos = SubQos}; {ok, _SubQos} -> @@ -664,7 +643,7 @@ acked(PktId, Session = #session{client_id = ClientId, awaiting_ack = Awaiting}) -> case lists:keyfind(PktId, 1, InflightQ) of {_, Msg} -> - emqttd:run_hooks('message.acked', [{ClientId, Username}], Msg); + emqttd:run_hooks('message.acked', [ClientId, Username], Msg); false -> ?LOG(error, "Cannot find acked pktid: ~p", [PktId], Session) end, diff --git a/src/emqttd_topic.erl b/src/emqttd_topic.erl index ebd16714d..2a198c2e1 100644 --- a/src/emqttd_topic.erl +++ b/src/emqttd_topic.erl @@ -16,23 +16,27 @@ -module(emqttd_topic). +-include("emqttd_protocol.hrl"). + -import(lists, [reverse/1]). -export([match/2, validate/1, triples/1, words/1, wildcard/1]). -export([join/1, feed_var/3, systop/1]). --export([strip/1, strip/2]). +-export([parse/1, parse/2]). -type(topic() :: binary()). +-type(option() :: local | {qos, mqtt_qos()} | {share, '$queue' | binary()}). + -type(word() :: '' | '+' | '#' | binary()). -type(words() :: list(word())). -type(triple() :: {root | binary(), word(), binary()}). --export_type([topic/0, word/0, triple/0]). +-export_type([topic/0, option/0, word/0, triple/0]). -define(MAX_TOPIC_LEN, 4096). @@ -172,28 +176,28 @@ join(Words) -> end, {true, <<>>}, [bin(W) || W <- Words]), Bin. --spec(strip(topic()) -> {topic(), [local | {share, binary()}]}). -strip(Topic) when is_binary(Topic) -> - strip(Topic, []). +-spec(parse(topic()) -> {topic(), [option()]}). +parse(Topic) when is_binary(Topic) -> + parse(Topic, []). -strip(Topic = <<"$local/", Topic1/binary>>, Options) -> +parse(Topic = <<"$local/", Topic1/binary>>, Options) -> case lists:member(local, Options) of true -> error({invalid_topic, Topic}); - false -> strip(Topic1, [local | Options]) + false -> parse(Topic1, [local | Options]) end; -strip(Topic = <<"$queue/", Topic1/binary>>, Options) -> +parse(Topic = <<"$queue/", Topic1/binary>>, Options) -> case lists:keyfind(share, 1, Options) of {share, _} -> error({invalid_topic, Topic}); - false -> strip(Topic1, [{share, '$queue'} | Options]) + false -> parse(Topic1, [{share, '$queue'} | Options]) end; -strip(Topic = <<"$share/", Topic1/binary>>, Options) -> +parse(Topic = <<"$share/", Topic1/binary>>, Options) -> case lists:keyfind(share, 1, Options) of {share, _} -> error({invalid_topic, Topic}); false -> [Share, Topic2] = binary:split(Topic1, <<"/">>), {Topic2, [{share, Share} | Options]} end; -strip(Topic, Options) -> {Topic, Options}. +parse(Topic, Options) -> {Topic, Options}. diff --git a/src/emqttd_ws_client.erl b/src/emqttd_ws_client.erl index d8f30d309..3ea9cdbdd 100644 --- a/src/emqttd_ws_client.erl +++ b/src/emqttd_ws_client.erl @@ -97,14 +97,14 @@ handle_call(Req, _From, State = #wsclient_state{peer = Peer}) -> {reply, {error, unsupported_request}, State}. handle_cast({subscribe, TopicTable}, State) -> - with_session(fun(SessPid) -> - emqttd_session:subscribe(SessPid, TopicTable) - end, State); + with_proto_state(fun(ProtoState) -> + emqttd_protocol:handle({subscribe, TopicTable}, ProtoState) + end, State); handle_cast({unsubscribe, Topics}, State) -> - with_session(fun(SessPid) -> - emqttd_session:unsubscribe(SessPid, Topics) - end, State); + with_proto_state(fun(ProtoState) -> + emqttd_protocol:handle({unsubscribe, Topics}, ProtoState) + end, State); handle_cast({received, Packet}, State = #wsclient_state{peer = Peer, proto_state = ProtoState}) -> case emqttd_protocol:received(Packet, ProtoState) of @@ -194,9 +194,6 @@ with_proto_state(Fun, State = #wsclient_state{proto_state = ProtoState}) -> {ok, ProtoState1} = Fun(ProtoState), noreply(State#wsclient_state{proto_state = ProtoState1}). -with_session(Fun, State = #wsclient_state{proto_state = ProtoState}) -> - Fun(emqttd_protocol:session(ProtoState)), noreply(State). - noreply(State) -> {noreply, State, hibernate}. diff --git a/test/emqttd_SUITE.erl b/test/emqttd_SUITE.erl index 06923ee8e..846ee1955 100644 --- a/test/emqttd_SUITE.erl +++ b/test/emqttd_SUITE.erl @@ -29,7 +29,7 @@ all() -> {group, pubsub}, {group, router}, {group, session}, - {group, retainer}, + %%{group, retainer}, {group, broker}, {group, metrics}, {group, stats}, @@ -60,8 +60,6 @@ groups() -> {hook, [sequence], [add_delete_hook, run_hooks]}, - {retainer, [sequence], - [dispatch_retained_messages]}, {backend, [sequence], []}, {http, [sequence], @@ -237,10 +235,10 @@ start_session(_) -> Message1 = Message#mqtt_message{pktid = 1}, emqttd_session:publish(SessPid, Message1), emqttd_session:pubrel(SessPid, 1), - emqttd_session:subscribe(SessPid, [{<<"topic/session">>, 2}]), + emqttd_session:subscribe(SessPid, [{<<"topic/session">>, [{qos, 2}]}]), Message2 = emqttd_message:make(<<"clientId">>, 1, <<"topic/session">>, <<"test">>), emqttd_session:publish(SessPid, Message2), - emqttd_session:unsubscribe(SessPid, [<<"topic/session">>]), + emqttd_session:unsubscribe(SessPid, [{<<"topic/session">>, []}]), emqttd_mock_client:stop(ClientPid). %%-------------------------------------------------------------------- @@ -303,20 +301,6 @@ hook_fun3(arg1, arg2, _Acc, init) -> ok. hook_fun4(arg1, arg2, Acc, init) -> {ok, [r2 | Acc]}. hook_fun5(arg1, arg2, Acc, init) -> {stop, [r3 | Acc]}. -%%-------------------------------------------------------------------- -%% Retainer Test -%%-------------------------------------------------------------------- - -dispatch_retained_messages(_) -> - Msg = #mqtt_message{retain = true, topic = <<"a/b/c">>, - payload = <<"payload">>}, - emqttd_retainer:retain(Msg), - emqttd_retainer:dispatch(<<"a/b/+">>, self()), - ?assert(receive {dispatch, <<"a/b/+">>, Msg} -> true after 10 -> false end), - emqttd_retainer:retain(#mqtt_message{retain = true, topic = <<"a/b/c">>, payload = <<>>}), - [] = emqttd_retainer:read_messages(<<"a/b/c">>). - - %%-------------------------------------------------------------------- %% HTTP Request Test %%-------------------------------------------------------------------- diff --git a/test/emqttd_SUITE_data/emqttd.conf b/test/emqttd_SUITE_data/emqttd.conf index 562bd908f..68546ed89 100644 --- a/test/emqttd_SUITE_data/emqttd.conf +++ b/test/emqttd_SUITE_data/emqttd.conf @@ -54,19 +54,6 @@ %% System interval of publishing broker $SYS messages {broker_sys_interval, 60}. -%%-------------------------------------------------------------------- -%% Retained message -%%-------------------------------------------------------------------- - -%% Expired after seconds, never expired if 0 -{retained_expired_after, 0}. - -%% Max number of retained messages -{retained_max_message_num, 100000}. - -%% Max Payload Size of retained message -{retained_max_playload_size, 65536}. - %%-------------------------------------------------------------------- %% Session %%-------------------------------------------------------------------- @@ -232,6 +219,23 @@ %% Modules %%-------------------------------------------------------------------- +%% Retainer Module +{module, retainer, [ + + %% disc: disc_copies, ram: ram_copies + {storage, ram}, + + %% Max number of retained messages + {max_message_num, 100000}, + + %% Max Payload Size of retained message + {max_playload_size, 65536}, + + %% Expired after seconds, never expired if 0 + {expired_after, 0} + +]}. + %% Client presence management module. Publish presence messages when %% client connected or disconnected. {module, presence, [{qos, 0}]}. diff --git a/test/emqttd_access_SUITE.erl b/test/emqttd_access_SUITE.erl index 074fd15df..bad624157 100644 --- a/test/emqttd_access_SUITE.erl +++ b/test/emqttd_access_SUITE.erl @@ -52,8 +52,8 @@ prepare_config() -> {allow, {user, "testuser"}, subscribe, ["a/b/c", "d/e/f/#"]}, {allow, {user, "admin"}, pubsub, ["a/b/c", "d/e/f/#"]}, {allow, {client, "testClient"}, subscribe, ["testTopics/testClient"]}, - {allow, all, subscribe, ["clients/$c"]}, - {allow, all, pubsub, ["users/$u/#"]}, + {allow, all, subscribe, ["clients/%c"]}, + {allow, all, pubsub, ["users/%u/#"]}, {deny, all, subscribe, ["$SYS/#", "#"]}, {deny, all}], write_config("access_SUITE_acl.conf", Rules), @@ -151,10 +151,10 @@ compile_rule(_) -> compile({allow, {user, "admin"}, pubsub, ["d/e/f/#"]}), {allow, {client, <<"testClient">>}, publish, [ [<<"testTopics">>, <<"testClient">>] ]} = compile({allow, {client, "testClient"}, publish, ["testTopics/testClient"]}), - {allow, all, pubsub, [{pattern, [<<"clients">>, <<"$c">>]}]} = - compile({allow, all, pubsub, ["clients/$c"]}), - {allow, all, subscribe, [{pattern, [<<"users">>, <<"$u">>, '#']}]} = - compile({allow, all, subscribe, ["users/$u/#"]}), + {allow, all, pubsub, [{pattern, [<<"clients">>, <<"%c">>]}]} = + compile({allow, all, pubsub, ["clients/%c"]}), + {allow, all, subscribe, [{pattern, [<<"users">>, <<"%u">>, '#']}]} = + compile({allow, all, subscribe, ["users/%u/#"]}), {deny, all, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} = compile({deny, all, subscribe, ["$SYS/#", "#"]}), {allow, all} = compile({allow, all}), @@ -171,9 +171,9 @@ match_rule(_) -> {matched, allow} = match(User, <<"d/e/f/x">>, compile({allow, {user, "TestUser"}, subscribe, ["a/b/c", "d/e/f/#"]})), nomatch = match(User, <<"d/e/f/x">>, compile({allow, {user, "admin"}, pubsub, ["d/e/f/#"]})), {matched, allow} = match(User, <<"testTopics/testClient">>, compile({allow, {client, "testClient"}, publish, ["testTopics/testClient"]})), - {matched, allow} = match(User, <<"clients/testClient">>, compile({allow, all, pubsub, ["clients/$c"]})), + {matched, allow} = match(User, <<"clients/testClient">>, compile({allow, all, pubsub, ["clients/%c"]})), {matched, allow} = match(#mqtt_client{username = <<"user2">>}, <<"users/user2/abc/def">>, - compile({allow, all, subscribe, ["users/$u/#"]})), + compile({allow, all, subscribe, ["users/%u/#"]})), {matched, deny} = match(User, <<"d/e/f">>, compile({deny, all, subscribe, ["$SYS/#", "#"]})), Rule = compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"WrongUser">>}]}, publish, <<"Topic">>}), nomatch = match(User, <<"Topic">>, Rule), diff --git a/test/emqttd_topic_SUITE.erl b/test/emqttd_topic_SUITE.erl index 5e9608e00..5692dbb43 100644 --- a/test/emqttd_topic_SUITE.erl +++ b/test/emqttd_topic_SUITE.erl @@ -22,14 +22,14 @@ -compile(export_all). -import(emqttd_topic, [wildcard/1, match/2, validate/1, triples/1, join/1, - words/1, systop/1, feed_var/3, strip/1, strip/2]). + words/1, systop/1, feed_var/3, parse/1, parse/2]). -define(N, 10000). all() -> [t_wildcard, t_match, t_match2, t_validate, t_triples, t_join, t_words, t_systop, t_feed_var, t_sys_match, 't_#_match', t_sigle_level_validate, t_sigle_level_match, t_match_perf, - t_triples_perf, t_strip]. + t_triples_perf, t_parse]. t_wildcard(_) -> true = wildcard(<<"a/b/#">>), @@ -171,11 +171,11 @@ t_feed_var(_) -> long_topic() -> iolist_to_binary([[integer_to_list(I), "/"] || I <- lists:seq(0, 10000)]). -t_strip(_) -> - ?assertEqual({<<"a/b/+/#">>, []}, strip(<<"a/b/+/#">>)), - ?assertEqual({<<"topic">>, [{share, '$queue'}]}, strip(<<"$queue/topic">>)), - ?assertEqual({<<"topic">>, [{share, <<"group">>}]}, strip(<<"$share/group/topic">>)), - ?assertEqual({<<"topic">>, [local]}, strip(<<"$local/topic">>)), - ?assertEqual({<<"topic">>, [{share, '$queue'}, local]}, strip(<<"$local/$queue/topic">>)), - ?assertEqual({<<"/a/b/c">>, [{share, <<"group">>}, local]}, strip(<<"$local/$share/group//a/b/c">>)). +t_parse(_) -> + ?assertEqual({<<"a/b/+/#">>, []}, parse(<<"a/b/+/#">>)), + ?assertEqual({<<"topic">>, [{share, '$queue'}]}, parse(<<"$queue/topic">>)), + ?assertEqual({<<"topic">>, [{share, <<"group">>}]}, parse(<<"$share/group/topic">>)), + ?assertEqual({<<"topic">>, [local]}, parse(<<"$local/topic">>)), + ?assertEqual({<<"topic">>, [{share, '$queue'}, local]}, parse(<<"$local/$queue/topic">>)), + ?assertEqual({<<"/a/b/c">>, [{share, <<"group">>}, local]}, parse(<<"$local/$share/group//a/b/c">>)).