Merge pull request #685 from emqtt/emq20

2.0-beta.2: improve the hooks design and consolidate the variables' format
This commit is contained in:
Feng Lee 2016-09-10 13:41:55 +08:00 committed by GitHub
commit 66494ddcc0
25 changed files with 430 additions and 384 deletions

View File

@ -8,7 +8,7 @@ dep_gproc = git https://github.com/uwiger/gproc.git
dep_lager = git https://github.com/basho/lager.git
dep_gen_conf = git https://github.com/emqtt/gen_conf.git
dep_gen_logger = git https://github.com/emqtt/gen_logger.git
dep_esockd = git https://github.com/emqtt/esockd.git udp
dep_esockd = git https://github.com/emqtt/esockd.git emq20
dep_mochiweb = git https://github.com/emqtt/mochiweb.git
ERLC_OPTS += +'{parse_transform, lager_transform}'

View File

@ -5,11 +5,44 @@
Changes
=======
.. _release_2.0:
.. _release_2.0_beta.2:
-------------------------------------
Version 2.0-beta1 (West of West Lake)
-------------------------------------
------------------
Version 2.0-beta.2
------------------
*Release Date: 2016-09-10*
CoAP Support
------------
Release an experimental CoAP Gateway: https://github.com/emqtt/emqttd_coap
API Breaking Changes
--------------------
'$u', '$c' variables in emqttd.conf and modules/acl.conf changed to '%u', '%c'
Improve the design of mqtt retained message, replace emqttd_retainer with emqttd_mod_retainer.
Add 'session.subscribed', 'session.unsubscribed' hooks, remove 'client.subscribe.after' hook
Tab 'retained_message' -> 'mqtt_retained'
Bugfix
------
[2.0 beta1] FORMAT ERROR: "~s PUBLISH to ~s: ~p" (PR #671)
Fixing issues in cluster mode. (PR #681)
Fixing issues with unsubscribe hook (PR #673)
.. _release_2.0_beta.1:
------------------
Version 2.0-beta.1
------------------
*Release Date: 2016-08-30*

View File

@ -337,10 +337,12 @@ Hooks defined by the emqttd 1.0 broker:
+------------------------+------------------------------------------------------+
| client.subscribe | Run before client subscribes topics |
+------------------------+------------------------------------------------------+
| client.subscribe.after | Run After client subscribed topics |
+------------------------+------------------------------------------------------+
| client.unsubscribe | Run when client unsubscribes topics |
+------------------------+------------------------------------------------------+
| session.subscribed | Run After client(session) subscribed a topic |
+------------------------+------------------------------------------------------+
| session.unsubscribed | Run After client(session) unsubscribed a topic |
+------------------------+------------------------------------------------------+
| message.publish | Run when a MQTT message is published |
+------------------------+------------------------------------------------------+
| message.delivered | Run when a MQTT message is delivered |
@ -517,4 +519,6 @@ Mnesia/ETS Tables
+--------------------+--------+----------------------------------------+
| mqtt_client | ets | Client Table |
+--------------------+--------+----------------------------------------+
| mqtt_retained | mnesia | Retained Message Table |
+--------------------+--------+----------------------------------------+

View File

@ -812,26 +812,28 @@ Register Callbacks for Hooks
The plugin could register callbacks for hooks. The hooks will be run by the broker when a client connected/disconnected, a topic subscribed/unsubscribed or a message published/delivered:
+------------------------+---------------------------------------+
| Name | Description |
+------------------------+---------------------------------------+
| client.connected | Run when a client connected to the |
| | broker successfully |
+------------------------+---------------------------------------+
| client.subscribe | Run before a client subscribes topics |
+------------------------+---------------------------------------+
| client.subscribe.after | Run after a client subscribed topics |
+------------------------+---------------------------------------+
| client.unsubscribe | Run when a client unsubscribes topics |
+------------------------+---------------------------------------+
| message.publish | Run when a message is published |
+------------------------+---------------------------------------+
| message.delivered | Run when a message is delivered |
+------------------------+---------------------------------------+
| message.acked | Run when a message(qos1/2) is acked |
+------------------------+---------------------------------------+
| client.disconnected | Run when a client is disconnnected |
+------------------------+---------------------------------------+
+------------------------+-----------------------------------------+
| Name | Description |
+------------------------+-----------------------------------------+
| client.connected | Run when a client connected to the |
| | broker successfully |
+------------------------+-----------------------------------------+
| client.subscribe | Run before a client subscribes topics |
+------------------------+-----------------------------------------+
| client.unsubscribe | Run when a client unsubscribes topics |
+------------------------+-----------------------------------------+
| session.subscribed | Run after a client subscribed a topic |
+------------------------+-----------------------------------------+
| session.unsubscribed | Run after a client unsubscribed a topic |
+------------------------+-----------------------------------------+
| message.publish | Run when a message is published |
+------------------------+-----------------------------------------+
| message.delivered | Run when a message is delivered |
+------------------------+-----------------------------------------+
| message.acked | Run when a message(qos1/2) is acked |
+------------------------+-----------------------------------------+
| client.disconnected | Run when a client is disconnnected |
+------------------------+-----------------------------------------+
emqttd_plugin_template.erl for example:
@ -841,13 +843,13 @@ emqttd_plugin_template.erl for example:
load(Env) ->
emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]),
emqttd:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]),
emqttd:hook('client.subscribe', fun ?MODULE:on_client_subscribe/3, [Env]),
emqttd:hook('client.subscribe.after', fun ?MODULE:on_client_subscribe_after/3, [Env]),
emqttd:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/3, [Env]),
emqttd:hook('client.subscribe', fun ?MODULE:on_client_subscribe/4, [Env]),
emqttd:hook('session.subscribed', fun ?MODULE:on_session_subscribed/4, [Env]),
emqttd:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4, [Env]),
emqttd:hook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4, [Env]),
emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),
emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/3, [Env]),
emqttd:hook('message.acked', fun ?MODULE:on_message_acked/3, [Env]).
emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/4, [Env]),
emqttd:hook('message.acked', fun ?MODULE:on_message_acked/4, [Env]).
Register CLI Modules
--------------------
@ -881,10 +883,10 @@ There will be a new CLI after the plugin loaded::
.. _emqttd_dashboard: https://github.com/emqtt/emqttd_dashboard
.. _emqttd_auth_ldap: https://github.com/emqtt/emqttd_auth_ldap
.. _emqttd_auth_http: https://github.com/emqtt/emqttd_auth_http
.. _emqttd_auth_mysql: https://github.com/emqtt/emqttd_plugin_mysql
.. _emqttd_auth_pgsql: https://github.com/emqtt/emqttd_plugin_pgsql
.. _emqttd_auth_redis: https://github.com/emqtt/emqttd_plugin_redis
.. _emqttd_auth_mongo: https://github.com/emqtt/emqttd_plugin_mongo
.. _emqttd_auth_mysql: https://github.com/emqtt/emqttd_auth_mysql
.. _emqttd_auth_pgsql: https://github.com/emqtt/emqttd_auth_pgsql
.. _emqttd_auth_redis: https://github.com/emqtt/emqttd_auth_redis
.. _emqttd_auth_mongo: https://github.com/emqtt/emqttd_auth_mongo
.. _emqttd_sn: https://github.com/emqtt/emqttd_sn
.. _emqttd_stomp: https://github.com/emqtt/emqttd_stomp
.. _emqttd_sockjs: https://github.com/emqtt/emqttd_sockjs

View File

@ -1,4 +1,4 @@
{deps, [
{gproc,".*",{git,"https://github.com/uwiger/gproc.git",""}},{lager,".*",{git,"https://github.com/basho/lager.git",""}},{gen_logger,".*",{git,"https://github.com/emqtt/gen_logger.git",""}},{gen_conf,".*",{git,"https://github.com/emqtt/gen_conf.git",""}},{esockd,".*",{git,"https://github.com/emqtt/esockd.git","udp"}},{mochiweb,".*",{git,"https://github.com/emqtt/mochiweb.git",""}}
{gproc,".*",{git,"https://github.com/uwiger/gproc.git",""}},{lager,".*",{git,"https://github.com/basho/lager.git",""}},{gen_logger,".*",{git,"https://github.com/emqtt/gen_logger.git",""}},{gen_conf,".*",{git,"https://github.com/emqtt/gen_conf.git",""}},{esockd,".*",{git,"https://github.com/emqtt/esockd.git","emq20"}},{mochiweb,".*",{git,"https://github.com/emqtt/mochiweb.git",""}}
]}.
{erl_opts, [{parse_transform,lager_transform}]}.

View File

@ -62,10 +62,10 @@ start() -> application:start(?APP).
%% @doc Get Config
-spec(conf(Key :: atom()) -> any()).
conf(Key) -> gen_conf:value(?APP, Key).
conf(Key) -> emqttd_conf:value(Key).
-spec(conf(Key :: atom(), Default :: any()) -> any()).
conf(Key, Default) -> gen_conf:value(?APP, Key, Default).
conf(Key, Default) -> emqttd_conf:value(Key, Default).
%% @doc Environment
-spec(env(Key:: atom()) -> any()).

View File

@ -75,8 +75,8 @@ compile(topic, Topic) ->
end.
'pattern?'(Words) ->
lists:member(<<"$u">>, Words)
orelse lists:member(<<"$c">>, Words).
lists:member(<<"%u">>, Words)
orelse lists:member(<<"%c">>, Words).
bin(L) when is_list(L) ->
list_to_binary(L);
@ -142,13 +142,13 @@ feed_var(Client, Pattern) ->
feed_var(Client, Pattern, []).
feed_var(_Client, [], Acc) ->
lists:reverse(Acc);
feed_var(Client = #mqtt_client{client_id = undefined}, [<<"$c">>|Words], Acc) ->
feed_var(Client, Words, [<<"$c">>|Acc]);
feed_var(Client = #mqtt_client{client_id = ClientId}, [<<"$c">>|Words], Acc) ->
feed_var(Client = #mqtt_client{client_id = undefined}, [<<"%c">>|Words], Acc) ->
feed_var(Client, Words, [<<"%c">>|Acc]);
feed_var(Client = #mqtt_client{client_id = ClientId}, [<<"%c">>|Words], Acc) ->
feed_var(Client, Words, [ClientId |Acc]);
feed_var(Client = #mqtt_client{username = undefined}, [<<"$u">>|Words], Acc) ->
feed_var(Client, Words, [<<"$u">>|Acc]);
feed_var(Client = #mqtt_client{username = Username}, [<<"$u">>|Words], Acc) ->
feed_var(Client = #mqtt_client{username = undefined}, [<<"%u">>|Words], Acc) ->
feed_var(Client, Words, [<<"%u">>|Acc]);
feed_var(Client = #mqtt_client{username = Username}, [<<"%u">>|Words], Acc) ->
feed_var(Client, Words, [Username|Acc]);
feed_var(Client, [W|Words], Acc) ->
feed_var(Client, Words, [W|Acc]).

View File

@ -42,7 +42,7 @@
Reason :: term()).
start(_StartType, _StartArgs) ->
print_banner(),
gen_conf:init(emqttd),
emqttd_conf:init(),
emqttd_mnesia:start(),
{ok, Sup} = emqttd_sup:start_link(),
start_servers(Sup),
@ -81,7 +81,6 @@ start_servers(Sup) ->
{"emqttd pubsub", {supervisor, emqttd_pubsub_sup}},
{"emqttd stats", emqttd_stats},
{"emqttd metrics", emqttd_metrics},
{"emqttd retainer", emqttd_retainer},
{"emqttd pooler", {supervisor, emqttd_pooler}},
{"emqttd trace", {supervisor, emqttd_trace_sup}},
{"emqttd client manager", {supervisor, emqttd_cm_sup}},

View File

@ -308,7 +308,7 @@ plugins(_) ->
%%--------------------------------------------------------------------
%% @doc Bridges command
bridges(["list"]) ->
foreach(fun({{Node, Topic}, _Pid}) ->
foreach(fun({Node, Topic, _Pid}) ->
?PRINT("bridge: ~s--~s-->~s~n", [node(), Topic, Node])
end, emqttd_bridge_sup_sup:bridges());

View File

@ -140,14 +140,14 @@ handle_call(Req, _From, State) ->
?UNEXPECTED_REQ(Req, State).
handle_cast({subscribe, TopicTable}, State) ->
with_session(fun(SessPid) ->
emqttd_session:subscribe(SessPid, TopicTable)
end, State);
with_proto_state(fun(ProtoState) ->
emqttd_protocol:handle({subscribe, TopicTable}, ProtoState)
end, State);
handle_cast({unsubscribe, Topics}, State) ->
with_session(fun(SessPid) ->
emqttd_session:unsubscribe(SessPid, Topics)
end, State);
with_proto_state(fun(ProtoState) ->
emqttd_protocol:handle({unsubscribe, Topics}, ProtoState)
end, State);
handle_cast(Msg, State) ->
?UNEXPECTED_MSG(Msg, State).
@ -249,10 +249,6 @@ with_proto_state(Fun, State = #client_state{proto_state = ProtoState}) ->
{ok, ProtoState1} = Fun(ProtoState),
hibernate(State#client_state{proto_state = ProtoState1}).
with_session(Fun, State = #client_state{proto_state = ProtoState}) ->
Fun(emqttd_protocol:session(ProtoState)),
hibernate(State).
%% Receive and parse tcp data
received(<<>>, State) ->
hibernate(State);

View File

@ -16,84 +16,97 @@
-module(emqttd_conf).
-export([mqtt/0, retained/0, session/0, queue/0, bridge/0, pubsub/0]).
-export([init/0]).
-export([mqtt/0, session/0, queue/0, bridge/0, pubsub/0]).
-export([value/1, value/2, list/1]).
-define(APP, emqttd).
init() -> gen_conf:init(?APP).
mqtt() ->
[
%% Max ClientId Length Allowed.
{max_clientid_len, emqttd:conf(mqtt_max_clientid_len, 512)},
%% Max Packet Size Allowed, 64K by default.
{max_packet_size, emqttd:conf(mqtt_max_packet_size, 65536)},
%% Client Idle Timeout.
{client_idle_timeout, emqttd:conf(mqtt_client_idle_timeout, 30)}
].
retained() ->
[
%% Expired after seconds, never expired if 0
{expired_after, emqttd:conf(retained_expired_after, 0)},
%% Max number of retained messages
{max_message_num, emqttd:conf(retained_max_message_num, 100000)},
%% Max Payload Size of retained message
{max_playload_size, emqttd:conf(retained_max_playload_size, 65536)}
].
with_env(mqtt_protocol, [
%% Max ClientId Length Allowed.
{max_clientid_len, value(mqtt_max_clientid_len, 512)},
%% Max Packet Size Allowed, 64K by default.
{max_packet_size, value(mqtt_max_packet_size, 65536)},
%% Client Idle Timeout.
{client_idle_timeout, value(mqtt_client_idle_timeout, 30)}
]).
session() ->
[
%% Max number of QoS 1 and 2 messages that can be inflight at one time.
%% 0 means no limit
{max_inflight, emqttd:conf(session_max_inflight, 100)},
with_env(mqtt_session, [
%% Max number of QoS 1 and 2 messages that can be inflight at one time.
%% 0 means no limit
{max_inflight, value(session_max_inflight, 100)},
%% Retry interval for redelivering QoS1/2 messages.
{unack_retry_interval, emqttd:conf(session_unack_retry_interval, 60)},
%% Retry interval for redelivering QoS1/2 messages.
{unack_retry_interval, value(session_unack_retry_interval, 60)},
%% Awaiting PUBREL Timeout
{await_rel_timeout, emqttd:conf(session_await_rel_timeout, 20)},
%% Awaiting PUBREL Timeout
{await_rel_timeout, value(session_await_rel_timeout, 20)},
%% Max Packets that Awaiting PUBREL, 0 means no limit
{max_awaiting_rel, emqttd:conf(session_max_awaiting_rel, 0)},
%% Max Packets that Awaiting PUBREL, 0 means no limit
{max_awaiting_rel, value(session_max_awaiting_rel, 0)},
%% Statistics Collection Interval(seconds)
{collect_interval, emqttd:conf(session_collect_interval, 0)},
%% Statistics Collection Interval(seconds)
{collect_interval, value(session_collect_interval, 0)},
%% Expired after 2 day (unit: minute)
{expired_after, emqttd:conf(session_expired_after, 2880)}
].
%% Expired after 2 day (unit: minute)
{expired_after, value(session_expired_after, 2880)}
]).
queue() ->
[
%% Type: simple | priority
{type, emqttd:conf(queue_type, simple)},
with_env(mqtt_queue, [
%% Type: simple | priority
{type, value(queue_type, simple)},
%% Topic Priority: 0~255, Default is 0
{priority, emqttd:conf(queue_priority, [])},
%% Topic Priority: 0~255, Default is 0
{priority, value(queue_priority, [])},
%% Max queue length. Enqueued messages when persistent client disconnected,
%% or inflight window is full.
{max_length, emqttd:conf(queue_max_length, infinity)},
%% Max queue length. Enqueued messages when persistent client disconnected,
%% or inflight window is full.
{max_length, value(queue_max_length, infinity)},
%% Low-water mark of queued messages
{low_watermark, emqttd:conf(queue_low_watermark, 0.2)},
%% Low-water mark of queued messages
{low_watermark, value(queue_low_watermark, 0.2)},
%% High-water mark of queued messages
{high_watermark, emqttd:conf(queue_high_watermark, 0.6)},
%% High-water mark of queued messages
{high_watermark, value(queue_high_watermark, 0.6)},
%% Queue Qos0 messages?
{queue_qos0, emqttd:conf(queue_qos0, true)}
].
%% Queue Qos0 messages?
{queue_qos0, value(queue_qos0, true)}
]).
bridge() ->
[
%% TODO: Bridge Queue Size
{max_queue_len, emqttd:conf(bridge_max_queue_len, 10000)},
with_env(mqtt_bridge, [
{max_queue_len, value(bridge_max_queue_len, 10000)},
%% Ping Interval of bridge node
{ping_down_interval, emqttd:conf(bridge_ping_down_interval, 1)}
].
%% Ping Interval of bridge node
{ping_down_interval, value(bridge_ping_down_interval, 1)}
]).
pubsub() ->
[
%% PubSub and Router. Default should be scheduler numbers.
{pool_size, emqttd:conf(pubsub_pool_size, 8)}
].
with_env(mqtt_pubsub, [
%% PubSub and Router. Default should be scheduler numbers.
{pool_size, value(pubsub_pool_size, 8)}
]).
value(Key) ->
with_env(Key, gen_conf:value(?APP, Key)).
value(Key, Default) ->
with_env(Key, gen_conf:value(?APP, Key, Default)).
with_env(Key, Conf) ->
case application:get_env(?APP, Key) of
undefined ->
application:set_env(?APP, Key, Conf), Conf;
{ok, Val} ->
Val
end.
list(Key) -> gen_conf:list(?APP, Key).

View File

@ -14,7 +14,6 @@
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc emqttd presence management module
-module(emqttd_mod_presence).
-behaviour(emqttd_gen_mod).

View File

@ -14,115 +14,119 @@
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc MQTT retained message.
-module(emqttd_retainer).
-module(emqttd_mod_retainer).
-behaviour(gen_server).
-behaviour(emqttd_gen_mod).
-include("emqttd.hrl").
-include("emqttd_internal.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
%% Mnesia Callbacks
-export([mnesia/1]).
%% gen_mod Callbacks
-export([load/1, unload/1]).
-boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
%% Hook Callbacks
-export([on_session_subscribed/4, on_message_publish/2]).
%% API Function Exports
-export([retain/1, read_messages/1, dispatch/2]).
%% API Function Exports
-export([start_link/0]).
-export([start_link/1]).
%% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(retained_message, {topic, msg}).
-record(mqtt_retained, {topic, msg}).
-record(state, {stats_fun, expired_after, stats_timer, expire_timer}).
%%--------------------------------------------------------------------
%% Mnesia callbacks
%% Load/Unload
%%--------------------------------------------------------------------
mnesia(boot) ->
ok = emqttd_mnesia:create_table(retained_message, [
{type, ordered_set},
{disc_copies, [node()]},
{record_name, retained_message},
{attributes, record_info(fields, retained_message)},
{storage_properties, [{ets, [compressed]},
{dets, [{auto_save, 1000}]}]}]);
load(Env) ->
emqttd_mod_sup:start_child(spec(Env)),
emqttd:hook('session.subscribed', fun ?MODULE:on_session_subscribed/4, [Env]),
emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]).
mnesia(copy) ->
ok = emqttd_mnesia:copy_table(retained_message).
on_session_subscribed(_ClientId, _Username, {Topic, _Opts}, _Env) ->
SessPid = self(),
Msgs = case emqttd_topic:wildcard(Topic) of
false -> read_messages(Topic);
true -> match_messages(Topic)
end,
lists:foreach(fun(Msg) -> SessPid ! {dispatch, Topic, Msg} end, lists:reverse(Msgs)).
on_message_publish(Msg = #mqtt_message{retain = false}, _Env) ->
{ok, Msg};
%% RETAIN flag set to 1 and payload containing zero bytes
on_message_publish(Msg = #mqtt_message{retain = true, topic = Topic, payload = <<>>}, _Env) ->
mnesia:dirty_delete(mqtt_retained, Topic),
{stop, Msg};
on_message_publish(Msg = #mqtt_message{topic = Topic, retain = true, payload = Payload}, Env) ->
case {is_table_full(Env), is_too_big(size(Payload), Env)} of
{false, false} ->
mnesia:dirty_write(#mqtt_retained{topic = Topic, msg = Msg}),
emqttd_metrics:set('messages/retained', retained_count());
{true, _}->
lager:error("Cannot retain message(topic=~s) for table is full!", [Topic]);
{_, true}->
lager:error("Cannot retain message(topic=~s, payload_size=~p)"
" for payload is too big!", [Topic, size(Payload)])
end,
{ok, Msg#mqtt_message{retain = false}}.
is_table_full(Env) ->
Limit = proplists:get_value(max_message_num, Env, 0),
Limit > 0 andalso (retained_count() > Limit).
is_too_big(Size, Env) ->
Limit = proplists:get_value(max_payload_size, Env, 0),
Limit > 0 andalso (Size > Limit).
unload(_Env) ->
emqttd:unhook('session.subscribed', fun ?MODULE:on_session_subscribed/4),
emqttd:unhook('message.publish', fun ?MODULE:on_message_publish/2),
emqttd_mod_sup:stop_child(?MODULE).
spec(Env) ->
{?MODULE, {?MODULE, start_link, [Env]}, permanent, 5000, worker, [?MODULE]}.
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
%% @doc Start the retainer
-spec(start_link() -> {ok, pid()} | ignore | {error, any()}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec(start_link(Env :: list()) -> {ok, pid()} | ignore | {error, any()}).
start_link(Env) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [Env], []).
%% @doc Retain a message
-spec(retain(mqtt_message()) -> ok | ignore).
retain(#mqtt_message{retain = false}) -> ignore;
%%--------------------------------------------------------------------
%% gen_server Callbacks
%%--------------------------------------------------------------------
%% RETAIN flag set to 1 and payload containing zero bytes
retain(#mqtt_message{retain = true, topic = Topic, payload = <<>>}) ->
delete_message(Topic);
retain(Msg = #mqtt_message{topic = Topic, retain = true, payload = Payload}) ->
TabSize = retained_count(),
case {TabSize < limit(table), size(Payload) < limit(payload)} of
{true, true} ->
retain_message(Msg),
emqttd_metrics:set('messages/retained', retained_count());
{false, _}->
lager:error("Cannot retain message(topic=~s) for table is full!", [Topic]);
{_, false}->
lager:error("Cannot retain message(topic=~s, payload_size=~p)"
" for payload is too big!", [Topic, size(Payload)])
end, ok.
limit(table) -> env(max_message_num);
limit(payload) -> env(max_playload_size).
env(Key) ->
case get({retained, Key}) of
undefined ->
Env = emqttd_conf:retained(),
Val = proplists:get_value(Key, Env),
put({retained, Key}, Val), Val;
Val ->
Val
end.
%% @doc Deliver retained messages to the subscriber
-spec(dispatch(Topic :: binary(), CPid :: pid()) -> any()).
dispatch(Topic, CPid) when is_binary(Topic) ->
Msgs = case emqttd_topic:wildcard(Topic) of
false -> read_messages(Topic);
true -> match_messages(Topic)
init([Env]) ->
Copy = case proplists:get_value(storage_type, Env, disc) of
disc -> disc_copies;
ram -> ram_copies
end,
lists:foreach(fun(Msg) -> CPid ! {dispatch, Topic, Msg} end, lists:reverse(Msgs)).
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([]) ->
ok = emqttd_mnesia:create_table(mqtt_retained, [
{type, ordered_set},
{Copy, [node()]},
{record_name, mqtt_retained},
{attributes, record_info(fields, mqtt_retained)},
{storage_properties, [{ets, [compressed]},
{dets, [{auto_save, 1000}]}]}]),
ok = emqttd_mnesia:copy_table(mqtt_retained),
StatsFun = emqttd_stats:statsfun('retained/count', 'retained/max'),
%% One second
{ok, StatsTimer} = timer:send_interval(timer:seconds(1), stats),
State = #state{stats_fun = StatsFun, stats_timer = StatsTimer},
{ok, init_expire_timer(env(expired_after), State)}.
{ok, init_expire_timer(proplists:get_value(expired_after, Env, 0), State)}.
init_expire_timer(0, State) ->
State;
@ -164,43 +168,35 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal Functions
%%--------------------------------------------------------------------
-spec(retain_message(mqtt_message()) -> ok).
retain_message(Msg = #mqtt_message{topic = Topic}) ->
mnesia:dirty_write(#retained_message{topic = Topic, msg = Msg}).
-spec(read_messages(binary()) -> [mqtt_message()]).
read_messages(Topic) ->
[Msg || #retained_message{msg = Msg} <- mnesia:dirty_read(retained_message, Topic)].
[Msg || #mqtt_retained{msg = Msg} <- mnesia:dirty_read(mqtt_retained, Topic)].
-spec(match_messages(binary()) -> [mqtt_message()]).
match_messages(Filter) ->
%% TODO: optimize later...
Fun = fun(#retained_message{topic = Name, msg = Msg}, Acc) ->
Fun = fun(#mqtt_retained{topic = Name, msg = Msg}, Acc) ->
case emqttd_topic:match(Name, Filter) of
true -> [Msg|Acc];
false -> Acc
end
end,
mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], retained_message]).
-spec(delete_message(binary()) -> ok).
delete_message(Topic) ->
mnesia:dirty_delete(retained_message, Topic).
mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], mqtt_retained]).
-spec(expire_messages(pos_integer()) -> any()).
expire_messages(Time) when is_integer(Time) ->
mnesia:transaction(
fun() ->
Match = ets:fun2ms(
fun(#retained_message{topic = Topic, msg = #mqtt_message{timestamp = Ts}})
fun(#mqtt_retained{topic = Topic, msg = #mqtt_message{timestamp = Ts}})
when Time > Ts -> Topic
end),
Topics = mnesia:select(retained_message, Match, write),
Topics = mnesia:select(mqtt_retained, Match, write),
lists:foreach(fun(<<"$SYS/", _/binary>>) -> ok; %% ignore $SYS/# messages
(Topic) -> mnesia:delete({retained_message, Topic})
(Topic) -> mnesia:delete({mqtt_retained, Topic})
end, Topics)
end).
-spec(retained_count() -> non_neg_integer()).
retained_count() -> mnesia:table_info(retained_message, size).
retained_count() -> mnesia:table_info(mqtt_retained, size).

View File

@ -14,7 +14,6 @@
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc emqttd rewrite module
-module(emqttd_mod_rewrite).
-behaviour(emqttd_gen_mod).
@ -23,7 +22,7 @@
-export([load/1, reload/1, unload/1]).
-export([rewrite_subscribe/3, rewrite_unsubscribe/3, rewrite_publish/2]).
-export([rewrite_subscribe/4, rewrite_unsubscribe/4, rewrite_publish/2]).
%%--------------------------------------------------------------------
%% API
@ -35,18 +34,18 @@ load(Opts) ->
ok;
File ->
{ok, Terms} = file:consult(File), Sections = compile(Terms),
emqttd:hook('client.subscribe', fun ?MODULE:rewrite_subscribe/3, [Sections]),
emqttd:hook('client.unsubscribe', fun ?MODULE:rewrite_unsubscribe/3, [Sections]),
emqttd:hook('client.subscribe', fun ?MODULE:rewrite_subscribe/4, [Sections]),
emqttd:hook('client.unsubscribe', fun ?MODULE:rewrite_unsubscribe/4, [Sections]),
emqttd:hook('message.publish', fun ?MODULE:rewrite_publish/2, [Sections])
end.
rewrite_subscribe({_ClientId, _Username}, {Topic, Opts}, Sections) ->
lager:info("Rewrite subscribe: ~p", [{Topic, Opts}]),
{ok, {match_topic(Topic, Sections), Opts}}.
rewrite_subscribe(_ClientId, _Username, TopicTable, Sections) ->
lager:info("Rewrite subscribe: ~p", [TopicTable]),
{ok, [{match_topic(Topic, Sections), Opts} || {Topic, Opts} <- TopicTable]}.
rewrite_unsubscribe({_ClientId, _Username}, Topic, Sections) ->
lager:info("Rewrite unsubscribe: ~p", [Topic]),
{ok, match_topic(Topic, Sections)}.
rewrite_unsubscribe(_ClientId, _Username, TopicTable, Sections) ->
lager:info("Rewrite unsubscribe: ~p", [TopicTable]),
{ok, [{match_topic(Topic, Sections), Opts} || {Topic, Opts} <- TopicTable]}.
rewrite_publish(Message=#mqtt_message{topic = Topic}, Sections) ->
%%TODO: this will not work if the client is always online.
@ -71,8 +70,8 @@ reload(File) ->
end.
unload(_) ->
emqttd:unhook('client.subscribe', fun ?MODULE:rewrite_subscribe/3),
emqttd:unhook('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/3),
emqttd:unhook('client.subscribe', fun ?MODULE:rewrite_subscribe/4),
emqttd:unhook('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/4),
emqttd:unhook('message.publish', fun ?MODULE:rewrite_publish/2).
%%--------------------------------------------------------------------

View File

@ -14,7 +14,6 @@
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc Subscription from Broker Side
-module(emqttd_mod_subscription).
-behaviour(emqttd_gen_mod).
@ -33,7 +32,7 @@ on_client_connected(?CONNACK_ACCEPT, Client = #mqtt_client{client_id = ClientId
client_pid = ClientPid,
username = Username}, Topics) ->
Replace = fun(Topic) -> rep(<<"$u">>, Username, rep(<<"$c">>, ClientId, Topic)) end,
Replace = fun(Topic) -> rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic)) end,
TopicTable = [{Replace(Topic), Qos} || {Topic, Qos} <- Topics],
emqttd_client:subscribe(ClientPid, TopicTable),
{ok, Client};
@ -44,10 +43,10 @@ on_client_connected(_ConnAck, _Client, _State) ->
unload(_Opts) ->
emqttd:unhook('client.connected', fun ?MODULE:on_client_connected/3).
rep(<<"$c">>, ClientId, Topic) ->
emqttd_topic:feed_var(<<"$c">>, ClientId, Topic);
rep(<<"$u">>, undefined, Topic) ->
rep(<<"%c">>, ClientId, Topic) ->
emqttd_topic:feed_var(<<"%c">>, ClientId, Topic);
rep(<<"%u">>, undefined, Topic) ->
Topic;
rep(<<"$u">>, Username, Topic) ->
emqttd_topic:feed_var(<<"$u">>, Username, Topic).
rep(<<"%u">>, Username, Topic) ->
emqttd_topic:feed_var(<<"%u">>, Username, Topic).

View File

@ -21,7 +21,7 @@
-include("emqttd.hrl").
%% API
-export([start_link/0, start_child/1, start_child/2]).
-export([start_link/0, start_child/1, start_child/2, stop_child/1]).
%% Supervisor callbacks
-export([init/1]).
@ -46,6 +46,13 @@ start_child(ChildSpec) when is_tuple(ChildSpec) ->
start_child(Mod, Type) when is_atom(Mod) and is_atom(Type) ->
supervisor:start_child(?MODULE, ?CHILD(Mod, Type)).
-spec(stop_child(any()) -> ok | {error, any()}).
stop_child(ChildId) ->
case supervisor:terminate_child(?MODULE, ChildId) of
ok -> supervisor:delete_child(?MODULE, ChildId);
Error -> Error
end.
%%--------------------------------------------------------------------
%% Supervisor callbacks
%%--------------------------------------------------------------------

View File

@ -14,7 +14,6 @@
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc MQTT Protocol Processor.
-module(emqttd_protocol).
-include("emqttd.hrl").
@ -28,7 +27,7 @@
%% API
-export([init/3, info/1, clientid/1, client/1, session/1]).
-export([received/2, send/2, redeliver/2, shutdown/2]).
-export([received/2, handle/2, send/2, redeliver/2, shutdown/2]).
-export([process/2]).
@ -116,6 +115,26 @@ received(Packet = ?PACKET(_Type), State) ->
{error, Reason, State}
end.
handle({subscribe, RawTopicTable}, ProtoState = #proto_state{client_id = ClientId,
username = Username,
session = Session}) ->
TopicTable = parse_topic_table(RawTopicTable),
case emqttd:run_hooks('client.subscribe', [ClientId, Username], TopicTable) of
{ok, TopicTable1} ->
emqttd_session:subscribe(Session, TopicTable1);
{stop, _} ->
ok
end,
{ok, ProtoState};
handle({unsubscribe, RawTopics}, ProtoState = #proto_state{client_id = ClientId,
username = Username,
session = Session}) ->
{ok, TopicTable} = emqttd:run_hooks('client.unsubscribe',
[ClientId, Username], parse_topics(RawTopics)),
emqttd_session:unsubscribe(Session, TopicTable),
{ok, ProtoState}.
process(Packet = ?CONNECT_PACKET(Var), State0) ->
#mqtt_packet_connect{proto_ver = ProtoVer,
@ -197,23 +216,32 @@ process(?PUBACK_PACKET(?PUBCOMP, PacketId), State = #proto_state{session = Sessi
process(?SUBSCRIBE_PACKET(PacketId, []), State) ->
send(?SUBACK_PACKET(PacketId, []), State);
process(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{session = Session}) ->
process(?SUBSCRIBE_PACKET(PacketId, RawTopicTable), State = #proto_state{
client_id = ClientId, username = Username, session = Session}) ->
Client = client(State),
AllowDenies = [check_acl(subscribe, Topic, Client) || {Topic, _Qos} <- TopicTable],
TopicTable = parse_topic_table(RawTopicTable),
AllowDenies = [check_acl(subscribe, Topic, Client) || {Topic, _Opts} <- TopicTable],
case lists:member(deny, AllowDenies) of
true ->
?LOG(error, "Cannot SUBSCRIBE ~p for ACL Deny", [TopicTable], State),
send(?SUBACK_PACKET(PacketId, [16#80 || _ <- TopicTable]), State);
false ->
emqttd_session:subscribe(Session, PacketId, TopicTable), {ok, State}
case emqttd:run_hooks('client.subscribe', [ClientId, Username], TopicTable) of
{ok, TopicTable1} ->
emqttd_session:subscribe(Session, PacketId, TopicTable1), {ok, State};
{stop, _} ->
{ok, State}
end
end;
%% Protect from empty topic list
process(?UNSUBSCRIBE_PACKET(PacketId, []), State) ->
send(?UNSUBACK_PACKET(PacketId), State);
process(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) ->
emqttd_session:unsubscribe(Session, Topics),
process(?UNSUBSCRIBE_PACKET(PacketId, RawTopics), State = #proto_state{
client_id = ClientId, username = Username, session = Session}) ->
{ok, TopicTable} = emqttd:run_hooks('client.unsubscribe', [ClientId, Username], parse_topics(RawTopics)),
emqttd_session:unsubscribe(Session, TopicTable),
send(?UNSUBACK_PACKET(PacketId), State);
process(?PACKET(?PINGREQ), State) ->
@ -249,7 +277,7 @@ with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId),
-spec(send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}).
send(Msg, State = #proto_state{client_id = ClientId, username = Username})
when is_record(Msg, mqtt_message) ->
emqttd:run_hooks('message.delivered', [{ClientId, Username}], Msg),
emqttd:run_hooks('message.delivered', [ClientId, Username], Msg),
send(emqttd_message:to_packet(Msg), State);
send(Packet, State = #proto_state{sendfun = SendFun})
@ -393,6 +421,15 @@ validate_qos(Qos) when ?IS_QOS(Qos) ->
validate_qos(_) ->
false.
parse_topic_table(TopicTable) ->
lists:map(fun({Topic0, Qos}) ->
{Topic, Opts} = emqttd_topic:parse(Topic0),
{Topic, [{qos, Qos}|Opts]}
end, TopicTable).
parse_topics(Topics) ->
[emqttd_topic:parse(Topic) || Topic <- Topics].
%% PUBLISH ACL is cached in process dictionary.
check_acl(publish, Topic, Client) ->
IfCache = emqttd:conf(cache_acl, true),
@ -412,4 +449,3 @@ check_acl(subscribe, Topic, Client) ->
sp(true) -> 1;
sp(false) -> 0.

View File

@ -91,12 +91,7 @@ publish(Msg = #mqtt_message{from = From}) ->
trace(publish, From, Msg),
case emqttd_hook:run('message.publish', [], Msg) of
{ok, Msg1 = #mqtt_message{topic = Topic}} ->
%% Retain message first. Don't create retained topic.
Msg2 = case emqttd_retainer:retain(Msg1) of
ok -> emqttd_message:unset_flag(Msg1);
ignore -> Msg1
end,
emqttd_pubsub:publish(Topic, Msg2);
emqttd_pubsub:publish(Topic, Msg1);
{stop, Msg1} ->
lager:warning("Stop publishing: ~s", [emqttd_message:format(Msg1)]),
ignore

View File

@ -84,7 +84,7 @@
packet_id = 1,
%% Clients subscriptions.
subscriptions :: dict:dict(),
subscriptions :: map(),
%% Inflight qos1, qos2 messages sent to the client but unacked,
%% QoS 1 and QoS 2 messages which have been sent to the Client,
@ -162,16 +162,14 @@ destroy(SessPid, ClientId) ->
%%--------------------------------------------------------------------
%% @doc Subscribe Topics
-spec(subscribe(pid(), [{binary(), mqtt_qos()}]) -> ok).
-spec(subscribe(pid(), [{binary(), [emqttd_topic:option()]}]) -> ok).
subscribe(SessPid, TopicTable) ->
gen_server2:cast(SessPid, {subscribe, TopicTable, fun(_) -> ok end}).
-spec(subscribe(pid(), mqtt_packet_id(), [{binary(), mqtt_qos()}]) -> ok).
subscribe(SessPid, PacketId, TopicTable) ->
From = self(),
AckFun = fun(GrantedQos) ->
From ! {suback, PacketId, GrantedQos}
end,
-spec(subscribe(pid(), mqtt_pktid(), [{binary(), [emqttd_topic:option()]}]) -> ok).
subscribe(SessPid, PktId, TopicTable) ->
From = self(),
AckFun = fun(GrantedQos) -> From ! {suback, PktId, GrantedQos} end,
gen_server2:cast(SessPid, {subscribe, TopicTable, AckFun}).
%% @doc Publish message
@ -206,9 +204,9 @@ pubcomp(SessPid, PktId) ->
gen_server2:cast(SessPid, {pubcomp, PktId}).
%% @doc Unsubscribe Topics
-spec(unsubscribe(pid(), [binary()]) -> ok).
unsubscribe(SessPid, Topics) ->
gen_server2:cast(SessPid, {unsubscribe, Topics}).
-spec(unsubscribe(pid(), [{binary(), [emqttd_topic:option()]}]) -> ok).
unsubscribe(SessPid, TopicTable) ->
gen_server2:cast(SessPid, {unsubscribe, TopicTable}).
%%--------------------------------------------------------------------
%% gen_server Callbacks
@ -223,7 +221,7 @@ init([CleanSess, {ClientId, Username}, ClientPid]) ->
client_id = ClientId,
client_pid = ClientPid,
username = Username,
subscriptions = dict:new(),
subscriptions = #{},
inflight_queue = [],
max_inflight = get_value(max_inflight, SessEnv, 0),
message_queue = emqttd_mqueue:new(ClientId, emqttd_conf:queue(), emqttd_alarm:alarm_fun()),
@ -250,10 +248,10 @@ prioritise_cast(Msg, _Len, _State) ->
case Msg of
{destroy, _} -> 10;
{resume, _, _} -> 9;
{pubrel, _PktId} -> 8;
{pubcomp, _PktId} -> 8;
{pubrec, _PktId} -> 8;
{puback, _PktId} -> 7;
{pubrel, _} -> 8;
{pubcomp, _} -> 8;
{pubrec, _} -> 8;
{puback, _} -> 7;
{unsubscribe, _, _} -> 6;
{subscribe, _, _} -> 5;
_ -> 0
@ -288,67 +286,48 @@ handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}},
handle_call(Req, _From, State) ->
?UNEXPECTED_REQ(Req, State).
%%TODO: 2.0 FIX
handle_cast({subscribe, TopicTable, AckFun}, Session = #session{client_id = ClientId,
username = Username,
subscriptions = Subscriptions}) ->
?LOG(info, "Subscribe ~p", [TopicTable], Session),
{GrantedQos, Subscriptions1} =
lists:foldl(fun({RawTopic, Qos}, {QosAcc, SubDict}) ->
{Topic, Opts} = emqttd_topic:strip(RawTopic),
case emqttd:run_hooks('client.subscribe', [{ClientId, Username}], {Topic, Opts}) of
{ok, {Topic1, Opts1}} ->
NewQos = proplists:get_value(qos, Opts1, Qos),
{[NewQos | QosAcc], case dict:find(Topic, SubDict) of
{ok, NewQos} ->
?LOG(warning, "duplicated subscribe: ~s, qos = ~w", [Topic, NewQos], Session),
SubDict;
{ok, OldQos} ->
emqttd:setqos(Topic, ClientId, NewQos),
?LOG(warning, "duplicated subscribe ~s, old_qos=~w, new_qos=~w", [Topic, OldQos, NewQos], Session),
dict:store(Topic, NewQos, SubDict);
error ->
emqttd:subscribe(Topic1, ClientId, Opts1),
%%TODO: the design is ugly...
%% <MQTT V3.1.1>: 3.8.4
%% Where the Topic Filter is not identical to any existing Subscriptions filter,
%% a new Subscription is created and all matching retained messages are sent.
emqttd_retainer:dispatch(Topic1, self()),
emqttd:run_hooks('client.subscribe.after', [{ClientId, Username}], {Topic1, Opts1}),
dict:store(Topic1, NewQos, SubDict)
end};
{stop, _} ->
?LOG(error, "Cannot subscribe: ~p", [Topic], Session),
{[128 | QosAcc], SubDict}
end
end, {[], Subscriptions}, TopicTable),
lists:foldl(fun({Topic, Opts}, {QosAcc, SubMap}) ->
NewQos = proplists:get_value(qos, Opts),
SubMap1 =
case maps:find(Topic, SubMap) of
{ok, NewQos} ->
?LOG(warning, "duplicated subscribe: ~s, qos = ~w", [Topic, NewQos], Session),
SubMap;
{ok, OldQos} ->
emqttd:setqos(Topic, ClientId, NewQos),
?LOG(warning, "duplicated subscribe ~s, old_qos=~w, new_qos=~w",
[Topic, OldQos, NewQos], Session),
maps:put(Topic, NewQos, SubMap);
error ->
emqttd:subscribe(Topic, ClientId, Opts),
emqttd:run_hooks('session.subscribed', [ClientId, Username], {Topic, Opts}),
maps:put(Topic, NewQos, SubMap)
end,
{[NewQos|QosAcc], SubMap1}
end, {[], Subscriptions}, TopicTable),
AckFun(lists:reverse(GrantedQos)),
hibernate(Session#session{subscriptions = Subscriptions1});
%%TODO: 2.0 FIX
handle_cast({unsubscribe, Topics}, Session = #session{client_id = ClientId,
username = Username,
subscriptions = Subscriptions}) ->
?LOG(info, "unsubscribe ~p", [Topics], Session),
handle_cast({unsubscribe, TopicTable}, Session = #session{client_id = ClientId,
username = Username,
subscriptions = Subscriptions}) ->
?LOG(info, "unsubscribe ~p", [TopicTable], Session),
Subscriptions1 =
lists:foldl(fun(RawTopic, SubDict) ->
{Topic0, _Opts} = emqttd_topic:strip(RawTopic),
case emqttd:run_hooks('client.unsubscribe', [{ClientId, Username}], Topic0) of
{ok, Topic1} ->
case dict:find(Topic1, SubDict) of
{ok, _Qos} ->
emqttd:unsubscribe(Topic1, ClientId),
dict:erase(Topic1, SubDict);
error ->
SubDict
end;
{stop, _} ->
SubDict
end
end, Subscriptions, Topics),
lists:foldl(fun({Topic, Opts}, SubMap) ->
case maps:find(Topic, SubMap) of
{ok, _Qos} ->
emqttd:unsubscribe(Topic, ClientId),
emqttd:run_hooks('session.unsubscribed', [ClientId, Username], {Topic, Opts}),
maps:remove(Topic, SubMap);
error ->
SubMap
end
end, Subscriptions, TopicTable),
hibernate(Session#session{subscriptions = Subscriptions1});
handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId}) ->
@ -582,8 +561,8 @@ dispatch(Msg = #mqtt_message{qos = QoS}, Session = #session{message_queue = MsgQ
hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)})
end.
tune_qos(Topic, Msg = #mqtt_message{qos = PubQos}, Subscriptions) ->
case dict:find(Topic, Subscriptions) of
tune_qos(Topic, Msg = #mqtt_message{qos = PubQos}, SubMap) ->
case maps:find(Topic, SubMap) of
{ok, SubQos} when PubQos > SubQos ->
Msg#mqtt_message{qos = SubQos};
{ok, _SubQos} ->
@ -664,7 +643,7 @@ acked(PktId, Session = #session{client_id = ClientId,
awaiting_ack = Awaiting}) ->
case lists:keyfind(PktId, 1, InflightQ) of
{_, Msg} ->
emqttd:run_hooks('message.acked', [{ClientId, Username}], Msg);
emqttd:run_hooks('message.acked', [ClientId, Username], Msg);
false ->
?LOG(error, "Cannot find acked pktid: ~p", [PktId], Session)
end,

View File

@ -16,23 +16,27 @@
-module(emqttd_topic).
-include("emqttd_protocol.hrl").
-import(lists, [reverse/1]).
-export([match/2, validate/1, triples/1, words/1, wildcard/1]).
-export([join/1, feed_var/3, systop/1]).
-export([strip/1, strip/2]).
-export([parse/1, parse/2]).
-type(topic() :: binary()).
-type(option() :: local | {qos, mqtt_qos()} | {share, '$queue' | binary()}).
-type(word() :: '' | '+' | '#' | binary()).
-type(words() :: list(word())).
-type(triple() :: {root | binary(), word(), binary()}).
-export_type([topic/0, word/0, triple/0]).
-export_type([topic/0, option/0, word/0, triple/0]).
-define(MAX_TOPIC_LEN, 4096).
@ -172,28 +176,28 @@ join(Words) ->
end, {true, <<>>}, [bin(W) || W <- Words]),
Bin.
-spec(strip(topic()) -> {topic(), [local | {share, binary()}]}).
strip(Topic) when is_binary(Topic) ->
strip(Topic, []).
-spec(parse(topic()) -> {topic(), [option()]}).
parse(Topic) when is_binary(Topic) ->
parse(Topic, []).
strip(Topic = <<"$local/", Topic1/binary>>, Options) ->
parse(Topic = <<"$local/", Topic1/binary>>, Options) ->
case lists:member(local, Options) of
true -> error({invalid_topic, Topic});
false -> strip(Topic1, [local | Options])
false -> parse(Topic1, [local | Options])
end;
strip(Topic = <<"$queue/", Topic1/binary>>, Options) ->
parse(Topic = <<"$queue/", Topic1/binary>>, Options) ->
case lists:keyfind(share, 1, Options) of
{share, _} -> error({invalid_topic, Topic});
false -> strip(Topic1, [{share, '$queue'} | Options])
false -> parse(Topic1, [{share, '$queue'} | Options])
end;
strip(Topic = <<"$share/", Topic1/binary>>, Options) ->
parse(Topic = <<"$share/", Topic1/binary>>, Options) ->
case lists:keyfind(share, 1, Options) of
{share, _} -> error({invalid_topic, Topic});
false -> [Share, Topic2] = binary:split(Topic1, <<"/">>),
{Topic2, [{share, Share} | Options]}
end;
strip(Topic, Options) -> {Topic, Options}.
parse(Topic, Options) -> {Topic, Options}.

View File

@ -97,14 +97,14 @@ handle_call(Req, _From, State = #wsclient_state{peer = Peer}) ->
{reply, {error, unsupported_request}, State}.
handle_cast({subscribe, TopicTable}, State) ->
with_session(fun(SessPid) ->
emqttd_session:subscribe(SessPid, TopicTable)
end, State);
with_proto_state(fun(ProtoState) ->
emqttd_protocol:handle({subscribe, TopicTable}, ProtoState)
end, State);
handle_cast({unsubscribe, Topics}, State) ->
with_session(fun(SessPid) ->
emqttd_session:unsubscribe(SessPid, Topics)
end, State);
with_proto_state(fun(ProtoState) ->
emqttd_protocol:handle({unsubscribe, Topics}, ProtoState)
end, State);
handle_cast({received, Packet}, State = #wsclient_state{peer = Peer, proto_state = ProtoState}) ->
case emqttd_protocol:received(Packet, ProtoState) of
@ -194,9 +194,6 @@ with_proto_state(Fun, State = #wsclient_state{proto_state = ProtoState}) ->
{ok, ProtoState1} = Fun(ProtoState),
noreply(State#wsclient_state{proto_state = ProtoState1}).
with_session(Fun, State = #wsclient_state{proto_state = ProtoState}) ->
Fun(emqttd_protocol:session(ProtoState)), noreply(State).
noreply(State) ->
{noreply, State, hibernate}.

View File

@ -29,7 +29,7 @@ all() ->
{group, pubsub},
{group, router},
{group, session},
{group, retainer},
%%{group, retainer},
{group, broker},
{group, metrics},
{group, stats},
@ -60,8 +60,6 @@ groups() ->
{hook, [sequence],
[add_delete_hook,
run_hooks]},
{retainer, [sequence],
[dispatch_retained_messages]},
{backend, [sequence],
[]},
{http, [sequence],
@ -237,10 +235,10 @@ start_session(_) ->
Message1 = Message#mqtt_message{pktid = 1},
emqttd_session:publish(SessPid, Message1),
emqttd_session:pubrel(SessPid, 1),
emqttd_session:subscribe(SessPid, [{<<"topic/session">>, 2}]),
emqttd_session:subscribe(SessPid, [{<<"topic/session">>, [{qos, 2}]}]),
Message2 = emqttd_message:make(<<"clientId">>, 1, <<"topic/session">>, <<"test">>),
emqttd_session:publish(SessPid, Message2),
emqttd_session:unsubscribe(SessPid, [<<"topic/session">>]),
emqttd_session:unsubscribe(SessPid, [{<<"topic/session">>, []}]),
emqttd_mock_client:stop(ClientPid).
%%--------------------------------------------------------------------
@ -303,20 +301,6 @@ hook_fun3(arg1, arg2, _Acc, init) -> ok.
hook_fun4(arg1, arg2, Acc, init) -> {ok, [r2 | Acc]}.
hook_fun5(arg1, arg2, Acc, init) -> {stop, [r3 | Acc]}.
%%--------------------------------------------------------------------
%% Retainer Test
%%--------------------------------------------------------------------
dispatch_retained_messages(_) ->
Msg = #mqtt_message{retain = true, topic = <<"a/b/c">>,
payload = <<"payload">>},
emqttd_retainer:retain(Msg),
emqttd_retainer:dispatch(<<"a/b/+">>, self()),
?assert(receive {dispatch, <<"a/b/+">>, Msg} -> true after 10 -> false end),
emqttd_retainer:retain(#mqtt_message{retain = true, topic = <<"a/b/c">>, payload = <<>>}),
[] = emqttd_retainer:read_messages(<<"a/b/c">>).
%%--------------------------------------------------------------------
%% HTTP Request Test
%%--------------------------------------------------------------------

View File

@ -54,19 +54,6 @@
%% System interval of publishing broker $SYS messages
{broker_sys_interval, 60}.
%%--------------------------------------------------------------------
%% Retained message
%%--------------------------------------------------------------------
%% Expired after seconds, never expired if 0
{retained_expired_after, 0}.
%% Max number of retained messages
{retained_max_message_num, 100000}.
%% Max Payload Size of retained message
{retained_max_playload_size, 65536}.
%%--------------------------------------------------------------------
%% Session
%%--------------------------------------------------------------------
@ -232,6 +219,23 @@
%% Modules
%%--------------------------------------------------------------------
%% Retainer Module
{module, retainer, [
%% disc: disc_copies, ram: ram_copies
{storage, ram},
%% Max number of retained messages
{max_message_num, 100000},
%% Max Payload Size of retained message
{max_playload_size, 65536},
%% Expired after seconds, never expired if 0
{expired_after, 0}
]}.
%% Client presence management module. Publish presence messages when
%% client connected or disconnected.
{module, presence, [{qos, 0}]}.

View File

@ -52,8 +52,8 @@ prepare_config() ->
{allow, {user, "testuser"}, subscribe, ["a/b/c", "d/e/f/#"]},
{allow, {user, "admin"}, pubsub, ["a/b/c", "d/e/f/#"]},
{allow, {client, "testClient"}, subscribe, ["testTopics/testClient"]},
{allow, all, subscribe, ["clients/$c"]},
{allow, all, pubsub, ["users/$u/#"]},
{allow, all, subscribe, ["clients/%c"]},
{allow, all, pubsub, ["users/%u/#"]},
{deny, all, subscribe, ["$SYS/#", "#"]},
{deny, all}],
write_config("access_SUITE_acl.conf", Rules),
@ -151,10 +151,10 @@ compile_rule(_) ->
compile({allow, {user, "admin"}, pubsub, ["d/e/f/#"]}),
{allow, {client, <<"testClient">>}, publish, [ [<<"testTopics">>, <<"testClient">>] ]} =
compile({allow, {client, "testClient"}, publish, ["testTopics/testClient"]}),
{allow, all, pubsub, [{pattern, [<<"clients">>, <<"$c">>]}]} =
compile({allow, all, pubsub, ["clients/$c"]}),
{allow, all, subscribe, [{pattern, [<<"users">>, <<"$u">>, '#']}]} =
compile({allow, all, subscribe, ["users/$u/#"]}),
{allow, all, pubsub, [{pattern, [<<"clients">>, <<"%c">>]}]} =
compile({allow, all, pubsub, ["clients/%c"]}),
{allow, all, subscribe, [{pattern, [<<"users">>, <<"%u">>, '#']}]} =
compile({allow, all, subscribe, ["users/%u/#"]}),
{deny, all, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} =
compile({deny, all, subscribe, ["$SYS/#", "#"]}),
{allow, all} = compile({allow, all}),
@ -171,9 +171,9 @@ match_rule(_) ->
{matched, allow} = match(User, <<"d/e/f/x">>, compile({allow, {user, "TestUser"}, subscribe, ["a/b/c", "d/e/f/#"]})),
nomatch = match(User, <<"d/e/f/x">>, compile({allow, {user, "admin"}, pubsub, ["d/e/f/#"]})),
{matched, allow} = match(User, <<"testTopics/testClient">>, compile({allow, {client, "testClient"}, publish, ["testTopics/testClient"]})),
{matched, allow} = match(User, <<"clients/testClient">>, compile({allow, all, pubsub, ["clients/$c"]})),
{matched, allow} = match(User, <<"clients/testClient">>, compile({allow, all, pubsub, ["clients/%c"]})),
{matched, allow} = match(#mqtt_client{username = <<"user2">>}, <<"users/user2/abc/def">>,
compile({allow, all, subscribe, ["users/$u/#"]})),
compile({allow, all, subscribe, ["users/%u/#"]})),
{matched, deny} = match(User, <<"d/e/f">>, compile({deny, all, subscribe, ["$SYS/#", "#"]})),
Rule = compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"WrongUser">>}]}, publish, <<"Topic">>}),
nomatch = match(User, <<"Topic">>, Rule),

View File

@ -22,14 +22,14 @@
-compile(export_all).
-import(emqttd_topic, [wildcard/1, match/2, validate/1, triples/1, join/1,
words/1, systop/1, feed_var/3, strip/1, strip/2]).
words/1, systop/1, feed_var/3, parse/1, parse/2]).
-define(N, 10000).
all() -> [t_wildcard, t_match, t_match2, t_validate, t_triples, t_join,
t_words, t_systop, t_feed_var, t_sys_match, 't_#_match',
t_sigle_level_validate, t_sigle_level_match, t_match_perf,
t_triples_perf, t_strip].
t_triples_perf, t_parse].
t_wildcard(_) ->
true = wildcard(<<"a/b/#">>),
@ -171,11 +171,11 @@ t_feed_var(_) ->
long_topic() ->
iolist_to_binary([[integer_to_list(I), "/"] || I <- lists:seq(0, 10000)]).
t_strip(_) ->
?assertEqual({<<"a/b/+/#">>, []}, strip(<<"a/b/+/#">>)),
?assertEqual({<<"topic">>, [{share, '$queue'}]}, strip(<<"$queue/topic">>)),
?assertEqual({<<"topic">>, [{share, <<"group">>}]}, strip(<<"$share/group/topic">>)),
?assertEqual({<<"topic">>, [local]}, strip(<<"$local/topic">>)),
?assertEqual({<<"topic">>, [{share, '$queue'}, local]}, strip(<<"$local/$queue/topic">>)),
?assertEqual({<<"/a/b/c">>, [{share, <<"group">>}, local]}, strip(<<"$local/$share/group//a/b/c">>)).
t_parse(_) ->
?assertEqual({<<"a/b/+/#">>, []}, parse(<<"a/b/+/#">>)),
?assertEqual({<<"topic">>, [{share, '$queue'}]}, parse(<<"$queue/topic">>)),
?assertEqual({<<"topic">>, [{share, <<"group">>}]}, parse(<<"$share/group/topic">>)),
?assertEqual({<<"topic">>, [local]}, parse(<<"$local/topic">>)),
?assertEqual({<<"topic">>, [{share, '$queue'}, local]}, parse(<<"$local/$queue/topic">>)),
?assertEqual({<<"/a/b/c">>, [{share, <<"group">>}, local]}, parse(<<"$local/$share/group//a/b/c">>)).