diff --git a/.gitignore b/.gitignore index 27a7afb52..19797d37d 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,4 @@ log/ *.swp *.so examples +docs/build/* diff --git a/.gitmodules b/.gitmodules index 3e84cb96d..17e1be420 100644 --- a/.gitmodules +++ b/.gitmodules @@ -19,3 +19,6 @@ [submodule "plugins/emqttd_recon"] path = plugins/emqttd_recon url = https://github.com/emqtt/emqttd_recon.git +[submodule "plugins/emqttd_plugin_redis"] + path = plugins/emqttd_plugin_redis + url = https://github.com/emqtt/emqttd_plugin_redis.git diff --git a/CHANGELOG.md b/CHANGELOG.md index f2cc31ca6..01e572b81 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,52 @@ emqttd ChangeLog ================== +0.15.0-beta(2016-01-28) +------------------------- + +#### Highlights + +Optimization for Route ETS insertion (#427) + +Add Mongodb, Redis Plugins + +Priority Message Queue Support + +ReadTheDocs + + +#### Enhancements + +Join/Leave the Cluster + +Username Authentication: Default Users + +Improve Cli commands: pubsub, bridges, trace + +emqttd_mod_subscription: fix client_connected/3 + + +#### BugFix + +Fix dequeue/1 of emqttd_bridge... + +Add emqttd:seed_now/0 function + + +#### Plugins + +emqttd_plubin_mysql: changed mysql driver to mysql-otp + +emqttd_plugin_pgsql: integrate with ecpool + +emqttd_plugin_redis: first release + +emqttd_plugin_mongo: first release + + +#### Benchmark + + 0.14.1-beta(2015-12-28) ------------------------- diff --git a/LICENSE b/LICENSE index a9a41df13..cd6ef5565 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ The MIT License (MIT) -Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md index 300a80bb3..5d3da07e8 100644 --- a/README.md +++ b/README.md @@ -5,11 +5,9 @@ emqttd is a massively scalable and clusterable MQTT V3.1/V3.1.1 broker written i emqttd requires Erlang R17+ to build. -**DON'T compile the broker with Erlang/OTP R18.0 which introduced a [binary memory leak](http://erlang.org/pipermail/erlang-questions/2015-September/086098.html).** - Demo Server: tcp://t.emqtt.io:1883 -Twitter: [@emqtt](https://twitter.com/emqtt) +Follow us on Twitter: [@emqtt](https://twitter.com/emqtt) ## Goals diff --git a/doc/MQTT_V3.1_Protocol_Specific.pdf b/doc/MQTT_V3.1_Protocol_Specific.pdf deleted file mode 100644 index 2ea1b2cab..000000000 Binary files a/doc/MQTT_V3.1_Protocol_Specific.pdf and /dev/null differ diff --git a/doc/design/Architecture.png b/doc/design/Architecture.png deleted file mode 100644 index 77c7dcc9e..000000000 Binary files a/doc/design/Architecture.png and /dev/null differ diff --git a/doc/design/ClientSession.md b/doc/design/ClientSession.md deleted file mode 100644 index c7f6f53d2..000000000 --- a/doc/design/ClientSession.md +++ /dev/null @@ -1,55 +0,0 @@ -## Transient Client/Session Sequence1 - -``` -Client1->SM: {start_session, {true, ClientId, self()}} -SM-->Session: {destory, ClientId} -Session-->Session: {shutdown, destroy} -Session-->Client2: exit({shutdown, destroy}) -Client2-->CM: {'DOWN', MRef, process, Pid, Reason} -SM-->Client1: {ok, SessPid} -Client1-->CM: {register, Client1} -``` - -![Transient Client/Session Sequence1](TransientSessionSeq1.png) - -## Transient Client/Session Sequence2 - - -``` -Client1->SM: {start_session, {true, ClientId, self()}} -SM-->Session: {destory, ClientId} -Session-->Session: {shutdown, destroy} -SM-->Client1: {ok, SessPid} -Client1-->CM: {register, Client1} -Session-->Client2: exit({shutdown, destroy}) -Client2-->CM: {'DOWN', MRef, process, Pid, Reason} -``` - -![Transient Client/Session Sequence2](TransientSessionSeq2.png) - -## Persistent Client/Session Sequence1 - -``` -Client1->SM: {start_session, {true, ClientId, self()}} -SM-->Session: {resume, ClientId, ClientPid} -Session-->Client2: {shutdown, conflict, {ClientId, Pid}} -Client2-->CM: {unregister, ClientId, self()} -SM-->Client1: {ok, SessPid} -Client1-->CM: {register, Client1} -``` - -![Persistent Client/Session Sequence1](PersistentSessionSeq1.png) - - -## Persistent Client/Session Sequence2 - -``` -Client1->SM: {start_session, {true, ClientId, self()}} -SM-->Session: {resume, ClientId, ClientPid} -SM-->Client1: {ok, SessPid} -Client1-->CM: {register, Client1} -Session-->Client2: {shutdown, conflict, {ClientId, Pid}} -Client2-->CM: {unregister, ClientId, self()} -``` - -![Persistent Client/Session Sequence2](PersistentSessionSeq2.png) diff --git a/doc/design/Design_Cluster.graphml b/doc/design/Design_Cluster.graphml deleted file mode 100644 index 0e1e06791..000000000 --- a/doc/design/Design_Cluster.graphml +++ /dev/null @@ -1,847 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - emqttd cluster - - - - - - - - - - Folder 1 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - # - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - x - - - - - - - - - - - - - - - - - - y - - - - - - - - - - - - - - - - - - t - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - # - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - x - - - - - - - - - - - - - - - - - - y - - - - - - - - - - - - - - - - - - t - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - # - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - x - - - - - - - - - - - - - - - - - - y - - - - - - - - - - - - - - - - - - t - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - # - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - x - - - - - - - - - - - - - - - - - - y - - - - - - - - - - - - - - - - - - t - - - - - - - - - - - - - - - - - - - - P - - - - - - - - - - - - - - - - - - P - - - - - - - - - - - - - - - - - - S - - - - - - - - - - - - - - - - - - S - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/doc/design/Design_Cluster.png b/doc/design/Design_Cluster.png deleted file mode 100644 index 77c7dcc9e..000000000 Binary files a/doc/design/Design_Cluster.png and /dev/null differ diff --git a/doc/design/Design_Standalone.graphml b/doc/design/Design_Standalone.graphml deleted file mode 100644 index 9382a66f6..000000000 --- a/doc/design/Design_Standalone.graphml +++ /dev/null @@ -1,274 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - P - - - - - - - - - - - - - - - - - - P - - - - - - - - - - - - - - - - - - S - - - - - - - - - - - - - - - - - - S - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - # - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - x - - - - - - - - - - - - - - - - - - y - - - - - - - - - - - - - - - - - - t - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/doc/design/Design_Standalone.png b/doc/design/Design_Standalone.png deleted file mode 100644 index 992694523..000000000 Binary files a/doc/design/Design_Standalone.png and /dev/null differ diff --git a/doc/design/PersistentSessionSeq1.png b/doc/design/PersistentSessionSeq1.png deleted file mode 100644 index 07e3b9856..000000000 Binary files a/doc/design/PersistentSessionSeq1.png and /dev/null differ diff --git a/doc/design/PersistentSessionSeq2.png b/doc/design/PersistentSessionSeq2.png deleted file mode 100644 index 5a731c230..000000000 Binary files a/doc/design/PersistentSessionSeq2.png and /dev/null differ diff --git a/doc/design/PubSub_CleanSess_0.png b/doc/design/PubSub_CleanSess_0.png deleted file mode 100644 index 831d25f8a..000000000 Binary files a/doc/design/PubSub_CleanSess_0.png and /dev/null differ diff --git a/doc/design/PubSub_CleanSess_1.png b/doc/design/PubSub_CleanSess_1.png deleted file mode 100644 index bd6227e3a..000000000 Binary files a/doc/design/PubSub_CleanSess_1.png and /dev/null differ diff --git a/doc/design/Seq.graphml b/doc/design/Seq.graphml deleted file mode 100644 index 8ddc9353a..000000000 --- a/doc/design/Seq.graphml +++ /dev/null @@ -1,234 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - T - - - - - - - - - - - - - - - - - - C1 - - - - - - - - - - - - - - - - - - S1 - - - - - - - - - - - - - - - - - - C2 - - - - - - - - - - - - - - - - - - S2 - - - - - - - - - - - - - - - - - - Queue - - - - - - - - - - - - - - - - - - Queue - - - - - - - - - - - - - - - - - - Dispatch - - - - - - - - - - - - - Deliver - - - - - - - - - - - - - Publish QoS1/2 - - - - - - - - - - - - - - - - - - - - - - - - - - Publish Qos0 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/doc/design/Seq.md b/doc/design/Seq.md deleted file mode 100644 index 924de84d3..000000000 --- a/doc/design/Seq.md +++ /dev/null @@ -1,14 +0,0 @@ -## QoS0 Publish Sequence - - -``` -title QoS0 Publish Sequence - -C1->PubSub: Publish QoS0 -PubSub-->S2: Dispatch QoS0 -S2-->C2: Deliver QoS0 -``` - -## QoS1 Publish Sequence - - diff --git a/doc/design/Seq.png b/doc/design/Seq.png deleted file mode 100644 index b77f682a1..000000000 Binary files a/doc/design/Seq.png and /dev/null differ diff --git a/doc/design/TransientSessionSeq1.png b/doc/design/TransientSessionSeq1.png deleted file mode 100644 index c15e24435..000000000 Binary files a/doc/design/TransientSessionSeq1.png and /dev/null differ diff --git a/doc/design/TransientSessionSeq2.png b/doc/design/TransientSessionSeq2.png deleted file mode 100644 index 210b9e526..000000000 Binary files a/doc/design/TransientSessionSeq2.png and /dev/null differ diff --git a/doc/design/qos0_seq.png b/doc/design/qos0_seq.png deleted file mode 100644 index a1ca32c0d..000000000 Binary files a/doc/design/qos0_seq.png and /dev/null differ diff --git a/doc/emqttd.graphml b/doc/emqttd.graphml deleted file mode 100644 index 0add31baf..000000000 --- a/doc/emqttd.graphml +++ /dev/null @@ -1,993 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - emqttd broker cluster - - - - - - - - - - Folder 1 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - # - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - x - - - - - - - - - - - - - - - - - y - - - - - - - - - - - - - - - - - t - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - # - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - x - - - - - - - - - - - - - - - - - y - - - - - - - - - - - - - - - - - t - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - # - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - x - - - - - - - - - - - - - - - - - y - - - - - - - - - - - - - - - - - t - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - # - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - x - - - - - - - - - - - - - - - - - y - - - - - - - - - - - - - - - - - t - - - - - - - - - - - - - - - - - - - Client - - - - - - - - - - - - - - - - - Sensor - - - - - - - - - - - - - - - - - Sensor - - - - - - - - - - - - - - - - - Client - - - - - - - - - - - - - - - - - - Application Server - - - - - - - - - - - - - - - - - - Web - - - - - - - - - - - - - - - - - - Web - - - - - - - - - - - - - - - - - - iPhone - - - - - - - - - - - - - - - - - - Android - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - MQTT - - - - - - - - - - - - - - - - - - - HTTP - - - - - - - - - - - - - - - - - - - MQTT - - - - - - - - - - - - - - - - - - - WebSocket - - - - - - - - - - - - - - - - - - - WebSocket - - - - - - - - - - - - - - - - - - - MQTT - - - - - - - - - - - - - - - - - - - MQTT - - - - - - - - - - - - - - - - - - - MQTT - - - - - - - - - - - - - - - - diff --git a/doc/emqttd.png b/doc/emqttd.png deleted file mode 100644 index 6066d5d08..000000000 Binary files a/doc/emqttd.png and /dev/null differ diff --git a/doc/mqtt-v3.1.1-os.pdf b/doc/mqtt-v3.1.1-os.pdf deleted file mode 100644 index 03a935fdd..000000000 Binary files a/doc/mqtt-v3.1.1-os.pdf and /dev/null differ diff --git a/doc/pool.md b/doc/pool.md deleted file mode 100644 index 1f032fd55..000000000 --- a/doc/pool.md +++ /dev/null @@ -1,7 +0,0 @@ -sup(one_for_all) - manager - pool_sup(one_for_one) - worker1 - worker2 - ... - workerN diff --git a/doc/rfc6455.pdf b/doc/rfc6455.pdf deleted file mode 100644 index 74625160e..000000000 Binary files a/doc/rfc6455.pdf and /dev/null differ diff --git a/doc/uuid.md b/doc/uuid.md deleted file mode 100644 index 5b7bd0aa0..000000000 --- a/doc/uuid.md +++ /dev/null @@ -1,19 +0,0 @@ -## Mongodb ObjectId - -* 4-byte value representing the seconds since the Unix epoch, -* 3-byte machine identifier, -* 2-byte process id, and -* 3-byte counter, starting with a random value. - -## Flake Id - -* 64bits Timestamp -* 48bits WorkerId -* 16bits Sequence - -## emqttd Id - -* 64bits Timestamp: erlang:now(), erlang:system_time -* 48bits (node+pid): Node + Pid -> Integer -* 16bits Sequence: PktId - diff --git a/docs/.placeholder b/docs/.placeholder new file mode 100644 index 000000000..e69de29bb diff --git a/include/emqttd.hrl b/include/emqttd.hrl index 2f7b9b5c4..1163bc323 100644 --- a/include/emqttd.hrl +++ b/include/emqttd.hrl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal @@ -28,7 +28,7 @@ %%------------------------------------------------------------------------------ %% Banner %%------------------------------------------------------------------------------ --define(COPYRIGHT, "Copyright (C) 2012-2015, Feng Lee "). +-define(COPYRIGHT, "Copyright (C) 2012-2016, Feng Lee "). -define(LICENSE_MESSAGE, "Licensed under MIT"). diff --git a/include/emqttd_cli.hrl b/include/emqttd_cli.hrl index 55d7d1739..8a0be700a 100644 --- a/include/emqttd_cli.hrl +++ b/include/emqttd_cli.hrl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/include/emqttd_internal.hrl b/include/emqttd_internal.hrl index 8d6d4c515..4a95d1d6b 100644 --- a/include/emqttd_internal.hrl +++ b/include/emqttd_internal.hrl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/include/emqttd_protocol.hrl b/include/emqttd_protocol.hrl index a335e8de7..00635e3c2 100644 --- a/include/emqttd_protocol.hrl +++ b/include/emqttd_protocol.hrl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% @Copyright (C) 2012-2015, Feng Lee +%%% @Copyright (C) 2012-2016, Feng Lee %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/plugins/emqttd_plugin_redis b/plugins/emqttd_plugin_redis new file mode 160000 index 000000000..c6a532d49 --- /dev/null +++ b/plugins/emqttd_plugin_redis @@ -0,0 +1 @@ +Subproject commit c6a532d49d2b479551bfd3b8d278d40c99e96ae3 diff --git a/rel/files/emqttd.config.development b/rel/files/emqttd.config.development index e50acbf52..93bacd972 100644 --- a/rel/files/emqttd.config.development +++ b/rel/files/emqttd.config.development @@ -85,9 +85,6 @@ {client, [ %% Socket is connected, but no 'CONNECT' packet received {idle_timeout, 10} %% seconds - %TODO: Network ingoing limit - %{ingoing_rate_limit, '64KB/s'} - %TODO: Reconnet control ]}, %% Session {session, [ @@ -111,11 +108,17 @@ {expired_after, 48} ]}, - %% Session + %% Queue {queue, [ - %% Max queue length. enqueued messages when persistent client disconnected, + %% simple | priority + {type, simple}, + + %% Topic Priority: 0~255, Default is 0 + %% {priority, [{"topic/1", 10}, {"topic/2", 8}]}, + + %% Max queue length. Enqueued messages when persistent client disconnected, %% or inflight window is full. - {max_length, 100}, + {max_length, infinity}, %% Low-water mark of queued messages {low_watermark, 0.2}, @@ -147,7 +150,7 @@ %% PubSub and Router {pubsub, [ %% Default should be scheduler numbers - %% {pool_size, 8}, + {pool_size, 8}, %% Subscription: disc | ram | false {subscription, ram}, diff --git a/rel/files/emqttd.config.production b/rel/files/emqttd.config.production index 9ef73e771..dc5a171d2 100644 --- a/rel/files/emqttd.config.production +++ b/rel/files/emqttd.config.production @@ -103,11 +103,17 @@ {expired_after, 48} ]}, - %% Session + %% Queue {queue, [ - %% Max queue length. enqueued messages when persistent client disconnected, + %% simple | priority + {type, simple}, + + %% Topic Priority: 0~255, Default is 0 + %% {priority, [{"topic/1", 10}, {"topic/2", 8}]}, + + %% Max queue length. Enqueued messages when persistent client disconnected, %% or inflight window is full. - {max_length, 100}, + {max_length, infinity}, %% Low-water mark of queued messages {low_watermark, 0.2}, @@ -139,7 +145,7 @@ %% PubSub and Router {pubsub, [ %% Default should be scheduler numbers - %% {pool_size, 8}, + {pool_size, 8}, %% Subscription: disc | ram | false {subscription, ram}, diff --git a/rel/files/vm.args b/rel/files/vm.args index 399b76d8f..245e369c6 100644 --- a/rel/files/vm.args +++ b/rel/files/vm.args @@ -46,6 +46,7 @@ -env ERTS_MAX_PORTS 8192 +## Mnesia and SSL will create temporary ets tables. -env ERL_MAX_ETS_TABLES 1024 ## Tweak GC to run more often diff --git a/src/emqttd.app.src b/src/emqttd.app.src index 5ab6896cc..65c4f7ce4 100644 --- a/src/emqttd.app.src +++ b/src/emqttd.app.src @@ -13,4 +13,3 @@ {mod, {emqttd_app, []}}, {env, []} ]}. - diff --git a/src/emqttd.erl b/src/emqttd.erl index b99869025..4e9bd42d7 100644 --- a/src/emqttd.erl +++ b/src/emqttd.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal @@ -23,12 +23,16 @@ %%% %%% @author Feng Lee %%%----------------------------------------------------------------------------- + -module(emqttd). -export([start/0, env/1, env/2, start_listeners/0, stop_listeners/0, load_all_mods/0, is_mod_enabled/1, - is_running/1]). + is_running/1, seed_now/0]). + +%% Utility functions. +-export([reg_name/2]). -define(MQTT_SOCKOPTS, [ binary, @@ -122,10 +126,8 @@ load_mod({Name, Opts}) -> is_mod_enabled(Name) -> env(modules, Name) =/= undefined. -%%------------------------------------------------------------------------------ %% @doc Is running? -%% @end -%%------------------------------------------------------------------------------ +-spec is_running(node()) -> boolean(). is_running(Node) -> case rpc:call(Node, erlang, whereis, [?APP]) of {badrpc, _} -> false; @@ -133,3 +135,16 @@ is_running(Node) -> Pid when is_pid(Pid) -> true end. +-spec reg_name(module(), pos_integer()) -> atom(). +reg_name(M, Id) when is_atom(M), is_integer(Id) -> + list_to_atom(lists:concat([M, "_", Id])). + +seed_now() -> + case erlang:function_exported(erlang, timestamp, 0) of + true -> %% R18 + random:seed(erlang:timestamp()); + false -> + %% compress 'now()' warning... + random:seed(os:timestamp()) + end. + diff --git a/src/emqttd_access_control.erl b/src/emqttd_access_control.erl index 5dcc78e27..0e6e7645b 100644 --- a/src/emqttd_access_control.erl +++ b/src/emqttd_access_control.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_access_rule.erl b/src/emqttd_access_rule.erl index 685f98fd7..3d62fd65f 100644 --- a/src/emqttd_access_rule.erl +++ b/src/emqttd_access_rule.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_acl_internal.erl b/src/emqttd_acl_internal.erl index ab0f3c0ad..1b5932e02 100644 --- a/src/emqttd_acl_internal.erl +++ b/src/emqttd_acl_internal.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_acl_mod.erl b/src/emqttd_acl_mod.erl index f44479599..dcc07a830 100644 --- a/src/emqttd_acl_mod.erl +++ b/src/emqttd_acl_mod.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_alarm.erl b/src/emqttd_alarm.erl index f22621b75..f16e13d45 100644 --- a/src/emqttd_alarm.erl +++ b/src/emqttd_alarm.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 7cfb10bd0..d23046427 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_auth_anonymous.erl b/src/emqttd_auth_anonymous.erl index 92e9da707..9f0d3167f 100644 --- a/src/emqttd_auth_anonymous.erl +++ b/src/emqttd_auth_anonymous.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_auth_clientid.erl b/src/emqttd_auth_clientid.erl index 2ed60e1b6..84eb2af14 100644 --- a/src/emqttd_auth_clientid.erl +++ b/src/emqttd_auth_clientid.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_auth_ldap.erl b/src/emqttd_auth_ldap.erl index d70aeaf47..9ef39e89f 100644 --- a/src/emqttd_auth_ldap.erl +++ b/src/emqttd_auth_ldap.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_auth_mod.erl b/src/emqttd_auth_mod.erl index 1f1370805..a52edac28 100644 --- a/src/emqttd_auth_mod.erl +++ b/src/emqttd_auth_mod.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal @@ -27,6 +27,10 @@ -include("emqttd.hrl"). +-export([passwd_hash/2]). + +-type hash_type() :: plain | md5 | sha | sha256. + %%%============================================================================= %%% Auth behavihour %%%============================================================================= @@ -53,3 +57,21 @@ behaviour_info(_Other) -> -endif. +%% @doc Password Hash +-spec passwd_hash(hash_type(), binary()) -> binary(). +passwd_hash(plain, Password) -> + Password; +passwd_hash(md5, Password) -> + hexstring(crypto:hash(md5, Password)); +passwd_hash(sha, Password) -> + hexstring(crypto:hash(sha, Password)); +passwd_hash(sha256, Password) -> + hexstring(crypto:hash(sha256, Password)). + +hexstring(<>) -> + iolist_to_binary(io_lib:format("~32.16.0b", [X])); +hexstring(<>) -> + iolist_to_binary(io_lib:format("~40.16.0b", [X])); +hexstring(<>) -> + iolist_to_binary(io_lib:format("~64.16.0b", [X])). + diff --git a/src/emqttd_auth_username.erl b/src/emqttd_auth_username.erl index aa831642c..a923aee6d 100644 --- a/src/emqttd_auth_username.erl +++ b/src/emqttd_auth_username.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal @@ -71,6 +71,9 @@ add_user(Username, Password) -> User = #?AUTH_USERNAME_TAB{username = Username, password = hash(Password)}, mnesia:transaction(fun mnesia:write/1, [User]). +add_default_user(Username, Password) -> + add_user(bin(Username), bin(Password)). + %%------------------------------------------------------------------------------ %% @doc Lookup user by username %% @end @@ -98,13 +101,16 @@ all_users() -> %%%============================================================================= %%% emqttd_auth callbacks %%%============================================================================= -init(Opts) -> +init(DefautUsers) -> mnesia:create_table(?AUTH_USERNAME_TAB, [ {disc_copies, [node()]}, {attributes, record_info(fields, ?AUTH_USERNAME_TAB)}]), mnesia:add_table_copy(?AUTH_USERNAME_TAB, node(), disc_copies), + lists:foreach(fun({Username, Password}) -> + add_default_user(Username, Password) + end, DefautUsers), emqttd_ctl:register_cmd(users, {?MODULE, cli}, []), - {ok, Opts}. + {ok, []}. check(#mqtt_client{username = undefined}, _Password, _Opts) -> {error, "Username undefined"}; @@ -136,8 +142,11 @@ md5_hash(SaltBin, Password) -> erlang:md5(<>). salt() -> - {A1,A2,A3} = now(), - random:seed(A1, A2, A3), + emqttd:seed_now(), Salt = random:uniform(16#ffffffff), <>. +bin(A) when is_atom(A) -> bin(atom_to_list(A)); +bin(L) when is_list(L) -> list_to_binary(L); +bin(B) when is_binary(B) -> B. + diff --git a/src/emqttd_bridge.erl b/src/emqttd_bridge.erl index cfdfa2863..4d4f1173c 100644 --- a/src/emqttd_bridge.erl +++ b/src/emqttd_bridge.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal @@ -172,7 +172,7 @@ dequeue(State = #state{mqueue = MQ}) -> {empty, MQ1} -> State#state{mqueue = MQ1}; {{value, Msg}, MQ1} -> - handle_info({dispatch, Msg}, State), + handle_info({dispatch, Msg#mqtt_message.topic, Msg}, State), dequeue(State#state{mqueue = MQ1}) end. diff --git a/src/emqttd_bridge_sup.erl b/src/emqttd_bridge_sup.erl index 0c1a7e701..a7d93997e 100644 --- a/src/emqttd_bridge_sup.erl +++ b/src/emqttd_bridge_sup.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_broker.erl b/src/emqttd_broker.erl index 0b6c09fc3..8f90fa01f 100644 --- a/src/emqttd_broker.erl +++ b/src/emqttd_broker.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal @@ -218,7 +218,7 @@ stop_tick(TRef) -> %%%============================================================================= init([]) -> - random:seed(os:timestamp()), + emqttd:seed_now(), ets:new(?BROKER_TAB, [set, public, named_table]), % Create $SYS Topics emqttd_pubsub:create(topic, <<"$SYS/brokers">>), diff --git a/src/emqttd_cli.erl b/src/emqttd_cli.erl index 8e6000b4d..4af1b3088 100644 --- a/src/emqttd_cli.erl +++ b/src/emqttd_cli.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal @@ -101,7 +101,7 @@ broker(["metrics"]) -> end, lists:sort(emqttd_metrics:all())); broker(["pubsub"]) -> - Pubsubs = supervisor:which_children(emqttd_pubsub_sup), + Pubsubs = supervisor:which_children(emqttd_pubsub_sup:pubsub_pool()), foreach(fun({{_, Id}, Pid, _, _}) -> ProcInfo = erlang:process_info(Pid, ?PROC_INFOKEYS), ?PRINT("pubsub: ~w~n", [Id]), @@ -323,7 +323,7 @@ plugins(_) -> bridges(["list"]) -> foreach(fun({{Node, Topic}, _Pid}) -> - ?PRINT("bridge: ~s ~s~n", [Node, Topic]) + ?PRINT("bridge: ~s--~s-->~s~n", [node(), Topic, Node]) end, emqttd_bridge_sup:bridges()); bridges(["options"]) -> @@ -449,7 +449,7 @@ trace(_) -> {"trace topic off", "stop to trace Topic"}]). trace_on(Who, Name, LogFile) -> - case emqttd_trace:start_trace({Who, bin(Name)}, LogFile) of + case emqttd_trace:start_trace({Who, iolist_to_binary(Name)}, LogFile) of ok -> ?PRINT("trace ~s ~s successfully.~n", [Who, Name]); {error, Error} -> @@ -457,7 +457,7 @@ trace_on(Who, Name, LogFile) -> end. trace_off(Who, Name) -> - case emqttd_trace:stop_trace({Who, bin(Name)}) of + case emqttd_trace:stop_trace({Who, iolist_to_binary(Name)}) of ok -> ?PRINT("stop to trace ~s ~s successfully.~n", [Who, Name]); {error, Error} -> @@ -507,17 +507,16 @@ print({{ClientId, _ClientPid}, SessInfo}) -> awaiting_rel, awaiting_ack, awaiting_comp, - created_at, - subscriptions], + created_at], ?PRINT("Session(~s, clean_sess=~s, max_inflight=~w, inflight_queue=~w, " "message_queue=~w, message_dropped=~w, " "awaiting_rel=~w, awaiting_ack=~w, awaiting_comp=~w, " - "created_at=~w, subscriptions=~s)~n", + "created_at=~w)~n", [ClientId | [format(Key, proplists:get_value(Key, SessInfo)) || Key <- InfoKeys]]). print(topic, Topic, Records) -> Nodes = [Node || #mqtt_topic{node = Node} <- Records], - ?PRINT("~s: on ~p~n", [Topic, Nodes]); + ?PRINT("~s: ~p~n", [Topic, Nodes]); print(subscription, ClientId, Subscriptions) -> TopicTable = [{Topic, Qos} || #mqtt_subscription{topic = Topic, qos = Qos} <- Subscriptions], diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index d303fa332..6b47f0aad 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_cm.erl b/src/emqttd_cm.erl index c1a226aea..dc9f6b792 100644 --- a/src/emqttd_cm.erl +++ b/src/emqttd_cm.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_cm_sup.erl b/src/emqttd_cm_sup.erl index 6640c7cdd..d3389ab92 100644 --- a/src/emqttd_cm_sup.erl +++ b/src/emqttd_cm_sup.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_ctl.erl b/src/emqttd_ctl.erl index 41ef77eec..11c4b7277 100644 --- a/src/emqttd_ctl.erl +++ b/src/emqttd_ctl.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_dist.erl b/src/emqttd_dist.erl index f5b1a35fa..fc8a6241f 100644 --- a/src/emqttd_dist.erl +++ b/src/emqttd_dist.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_gen_mod.erl b/src/emqttd_gen_mod.erl index 59dbcc085..e36d7c283 100644 --- a/src/emqttd_gen_mod.erl +++ b/src/emqttd_gen_mod.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_guid.erl b/src/emqttd_guid.erl index f19ea4448..8844e2577 100644 --- a/src/emqttd_guid.erl +++ b/src/emqttd_guid.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_http.erl b/src/emqttd_http.erl index a3228f7ff..4d52846e3 100644 --- a/src/emqttd_http.erl +++ b/src/emqttd_http.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_keepalive.erl b/src/emqttd_keepalive.erl index 2616f746b..9a3151836 100644 --- a/src/emqttd_keepalive.erl +++ b/src/emqttd_keepalive.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_log.erl b/src/emqttd_log.erl index 4be2f2999..c2f82408d 100644 --- a/src/emqttd_log.erl +++ b/src/emqttd_log.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_message.erl b/src/emqttd_message.erl index 831528cfd..79ed3edb8 100644 --- a/src/emqttd_message.erl +++ b/src/emqttd_message.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_metrics.erl b/src/emqttd_metrics.erl index 37b034355..0ef5bda1c 100644 --- a/src/emqttd_metrics.erl +++ b/src/emqttd_metrics.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal @@ -283,7 +283,7 @@ key(counter, Metric) -> %%%============================================================================= init([]) -> - random:seed(os:timestamp()), + emqttd:seed_now(), Metrics = ?SYSTOP_BYTES ++ ?SYSTOP_PACKETS ++ ?SYSTOP_MESSAGES, % Create metrics table ets:new(?METRIC_TAB, [set, public, named_table, {write_concurrency, true}]), diff --git a/src/emqttd_mnesia.erl b/src/emqttd_mnesia.erl index 531913602..d299082d8 100644 --- a/src/emqttd_mnesia.erl +++ b/src/emqttd_mnesia.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_mod_presence.erl b/src/emqttd_mod_presence.erl index 1eb7228d7..458616c7b 100644 --- a/src/emqttd_mod_presence.erl +++ b/src/emqttd_mod_presence.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_mod_rewrite.erl b/src/emqttd_mod_rewrite.erl index 6499a909f..e18ba2c75 100644 --- a/src/emqttd_mod_rewrite.erl +++ b/src/emqttd_mod_rewrite.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_mod_subscription.erl b/src/emqttd_mod_subscription.erl index a4d786f26..701d68f94 100644 --- a/src/emqttd_mod_subscription.erl +++ b/src/emqttd_mod_subscription.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal @@ -40,7 +40,7 @@ -endif. load(Opts) -> - Topics = [{bin(Topic), QoS} || {Topic, QoS} <- Opts, ?IS_QOS(QoS)], + Topics = [{iolist_to_binary(Topic), QoS} || {Topic, QoS} <- Opts, ?IS_QOS(QoS)], State = #state{topics = Topics, stored = lists:member(stored, Opts)}, emqttd_broker:hook('client.connected', {?MODULE, client_connected}, {?MODULE, client_connected, [State]}), @@ -52,7 +52,9 @@ client_connected(?CONNACK_ACCEPT, #mqtt_client{client_id = ClientId, #state{topics = Topics, stored = Stored}) -> Replace = fun(Topic) -> rep(<<"$u">>, Username, rep(<<"$c">>, ClientId, Topic)) end, TopicTable = with_stored(Stored, ClientId, [{Replace(Topic), Qos} || {Topic, Qos} <- Topics]), - emqttd_client:subscribe(ClientPid, TopicTable). + emqttd_client:subscribe(ClientPid, TopicTable); + +client_connected(_ConnAck, _Client, _State) -> ok. with_stored(false, _ClientId, TopicTable) -> TopicTable; @@ -70,8 +72,3 @@ rep(<<"$u">>, undefined, Topic) -> rep(<<"$u">>, Username, Topic) -> emqttd_topic:feed_var(<<"$u">>, Username, Topic). -bin(B) when is_binary(B) -> - B; -bin(S) when is_list(S) -> - list_to_binary(S). - diff --git a/src/emqttd_mod_sup.erl b/src/emqttd_mod_sup.erl index 76c60d164..6d7d809fe 100644 --- a/src/emqttd_mod_sup.erl +++ b/src/emqttd_mod_sup.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_mqueue.erl b/src/emqttd_mqueue.erl index 769ecab25..51227dcf9 100644 --- a/src/emqttd_mqueue.erl +++ b/src/emqttd_mqueue.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal @@ -30,20 +30,21 @@ %%% %%% If the broker restarted or crashed, all the messages queued will be gone. %%% -%%% Desgin of The Queue: +%%% Concept of Message Queue and Inflight Window: +%%% %%% |<----------------- Max Len ----------------->| %%% ----------------------------------------------- -%%% IN -> | Pending Messages | Inflight Window | -> Out +%%% IN -> | Messages Queue | Inflight Window | -> Out %%% ----------------------------------------------- -%%% |<--- Win Size --->| +%%% |<--- Win Size --->| %%% %%% -%%% 1. Inflight Window to store the messages awaiting for ack. +%%% 1. Inflight Window to store the messages delivered and awaiting for puback. %%% -%%% 2. Suspend IN messages when the queue is deactive, or inflight windows is full. +%%% 2. Enqueue messages when the inflight window is full. %%% %%% 3. If the queue is full, dropped qos0 messages if store_qos0 is true, -%%% otherwise dropped the oldest pending one. +%%% otherwise dropped the oldest one. %%% %%% @end %%% @@ -55,96 +56,161 @@ -include("emqttd_protocol.hrl"). --export([new/3, name/1, - is_empty/1, is_full/1, - len/1, max_len/1, - in/2, out/1, - stats/1]). +-export([new/3, type/1, name/1, is_empty/1, len/1, max_len/1, in/2, out/1, stats/1]). -define(LOW_WM, 0.2). -define(HIGH_WM, 0.6). --record(mqueue, {name, - q = queue:new(), %% pending queue - len = 0, %% current queue len - low_wm = ?LOW_WM, - high_wm = ?HIGH_WM, - max_len = ?MAX_LEN, - qos0 = false, - dropped = 0, +-type priority() :: {iolist(), pos_integer()}. + +-type option() :: {type, simple | priority} + | {max_length, pos_integer() | infinity} + | {priority, list(priority())} + | {low_watermark, float()} %% Low watermark + | {high_watermark, float()} %% High watermark + | {queue_qos0, boolean()}. %% Queue Qos0? + +-type mqueue_option() :: {max_length, pos_integer()} %% Max queue length + | {low_watermark, float()} %% Low watermark + | {high_watermark, float()} %% High watermark + | {queue_qos0, boolean()}. %% Queue Qos0 + +-type stat() :: {max_len, infinity | pos_integer()} + | {len, non_neg_integer()} + | {dropped, non_neg_integer()}. + +-record(mqueue, {type :: simple | priority, + name, q :: queue:queue() | priority_queue:q(), + %% priority table + pseq = 0, priorities = [], + %% len of simple queue + len = 0, max_len = ?MAX_LEN, + low_wm = ?LOW_WM, high_wm = ?HIGH_WM, + qos0 = false, dropped = 0, alarm_fun}). -type mqueue() :: #mqueue{}. --type mqueue_option() :: {max_length, pos_integer()} %% Max queue length - | {low_watermark, float()} %% Low watermark - | {high_watermark, float()} %% High watermark - | {queue_qos0, boolean()}. %% Queue Qos0 +-export_type([mqueue/0, priority/0, option/0]). --export_type([mqueue/0]). - -%%------------------------------------------------------------------------------ %% @doc New Queue. -%% @end -%%------------------------------------------------------------------------------ --spec new(binary(), list(mqueue_option()), fun()) -> mqueue(). +-spec new(iolist(), list(mqueue_option()), fun()) -> mqueue(). new(Name, Opts, AlarmFun) -> - MaxLen = emqttd_opts:g(max_length, Opts, 1000), - #mqueue{name = Name, - max_len = MaxLen, - low_wm = round(MaxLen * emqttd_opts:g(low_watermark, Opts, ?LOW_WM)), - high_wm = round(MaxLen * emqttd_opts:g(high_watermark, Opts, ?HIGH_WM)), - qos0 = emqttd_opts:g(queue_qos0, Opts, true), - alarm_fun = AlarmFun}. + Type = emqttd_opts:g(type, Opts, simple), + MaxLen = emqttd_opts:g(max_length, Opts, infinity), + init_q(#mqueue{type = Type, name = iolist_to_binary(Name), + len = 0, max_len = MaxLen, + low_wm = low_wm(MaxLen, Opts), + high_wm = high_wm(MaxLen, Opts), + qos0 = emqttd_opts:g(queue_qos0, Opts, false), + alarm_fun = AlarmFun}, Opts). +init_q(MQ = #mqueue{type = simple}, _Opts) -> + MQ#mqueue{q = queue:new()}; +init_q(MQ = #mqueue{type = priority}, Opts) -> + Priorities = emqttd_opts:g(priority, Opts, []), + init_p(Priorities, MQ#mqueue{q = priority_queue:new()}). + +init_p([], MQ) -> + MQ; +init_p([{Topic, P} | L], MQ) -> + {_, MQ1} = insert_p(iolist_to_binary(Topic), P, MQ), + init_p(L, MQ1). + +insert_p(Topic, P, MQ = #mqueue{priorities = Tab, pseq = Seq}) -> + <> = <>, + {PInt, MQ#mqueue{priorities = [{Topic, PInt} | Tab], pseq = Seq + 1}}. + +low_wm(infinity, _Opts) -> + infinity; +low_wm(MaxLen, Opts) -> + round(MaxLen * emqttd_opts:g(low_watermark, Opts, ?LOW_WM)). + +high_wm(infinity, _Opts) -> + infinity; +high_wm(MaxLen, Opts) -> + round(MaxLen * emqttd_opts:g(high_watermark, Opts, ?HIGH_WM)). + +-spec name(mqueue()) -> iolist(). name(#mqueue{name = Name}) -> Name. -is_empty(#mqueue{len = 0}) -> true; -is_empty(_MQ) -> false. +-spec type(mqueue()) -> atom(). +type(#mqueue{type = Type}) -> + Type. -is_full(#mqueue{len = Len, max_len = MaxLen}) - when Len =:= MaxLen -> true; -is_full(_MQ) -> false. +is_empty(#mqueue{type = simple, len = Len}) -> Len =:= 0; +is_empty(#mqueue{type = priority, q = Q}) -> priority_queue:is_empty(Q). -len(#mqueue{len = Len}) -> Len. +len(#mqueue{type = simple, len = Len}) -> Len; +len(#mqueue{type = priority, q = Q}) -> priority_queue:len(Q). max_len(#mqueue{max_len= MaxLen}) -> MaxLen. -stats(#mqueue{max_len = MaxLen, len = Len, dropped = Dropped}) -> - [{max_len, MaxLen}, {len, Len}, {dropped, Dropped}]. - -%%------------------------------------------------------------------------------ -%% @doc Queue one message. -%% @end -%%------------------------------------------------------------------------------ +%% @doc Stats of the mqueue +-spec stats(mqueue()) -> [stat()]. +stats(#mqueue{type = Type, q = Q, max_len = MaxLen, len = Len, dropped = Dropped}) -> + [{len, case Type of + simple -> Len; + priority -> priority_queue:len(Q) + end} | [{max_len, MaxLen}, {dropped, Dropped}]]. +%% @doc Enqueue a message. -spec in(mqtt_message(), mqueue()) -> mqueue(). -%% drop qos0 in(#mqtt_message{qos = ?QOS_0}, MQ = #mqueue{qos0 = false}) -> MQ; - -%% simply drop the oldest one if queue is full, improve later -in(Msg, MQ = #mqueue{q = Q, len = Len, max_len = MaxLen, dropped = Dropped}) - when Len =:= MaxLen -> - {{value, _OldMsg}, Q2} = queue:out(Q), - %lager:error("MQueue(~s) drop ~s", [Name, emqttd_message:format(OldMsg)]), +in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = infinity}) -> + MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1}; +in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = MaxLen, dropped = Dropped}) + when Len >= MaxLen -> + {{value, _Old}, Q2} = queue:out(Q), MQ#mqueue{q = queue:in(Msg, Q2), dropped = Dropped +1}; +in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len}) -> + maybe_set_alarm(MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1}); -in(Msg, MQ = #mqueue{q = Q, len = Len}) -> - maybe_set_alarm(MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1}). +in(Msg = #mqtt_message{topic = Topic}, MQ = #mqueue{type = priority, q = Q, + priorities = Priorities, + max_len = infinity}) -> + case lists:keysearch(Topic, 1, Priorities) of + {value, {_, Pri}} -> + MQ#mqueue{q = priority_queue:in(Msg, Pri, Q)}; + false -> + {Pri, MQ1} = insert_p(Topic, 0, MQ), + MQ1#mqueue{q = priority_queue:in(Msg, Pri, Q)} + end; +in(Msg = #mqtt_message{topic = Topic}, MQ = #mqueue{type = priority, q = Q, + priorities = Priorities, + max_len = MaxLen}) -> + case lists:keysearch(Topic, 1, Priorities) of + {value, {_, Pri}} -> + case priority_queue:plen(Pri, Q) >= MaxLen of + true -> + {_, Q1} = priority_queue:out(Pri, Q), + MQ#mqueue{q = priority_queue:in(Msg, Pri, Q1)}; + false -> + MQ#mqueue{q = priority_queue:in(Msg, Pri, Q)} + end; + false -> + {Pri, MQ1} = insert_p(Topic, 0, MQ), + MQ1#mqueue{q = priority_queue:in(Msg, Pri, Q)} + end. -out(MQ = #mqueue{len = 0}) -> +out(MQ = #mqueue{type = simple, len = 0}) -> {empty, MQ}; - -out(MQ = #mqueue{q = Q, len = Len}) -> - {Result, Q2} = queue:out(Q), - {Result, maybe_clear_alarm(MQ#mqueue{q = Q2, len = Len - 1})}. +out(MQ = #mqueue{type = simple, q = Q, len = Len, max_len = infinity}) -> + {R, Q2} = queue:out(Q), + {R, MQ#mqueue{q = Q2, len = Len - 1}}; +out(MQ = #mqueue{type = simple, q = Q, len = Len}) -> + {R, Q2} = queue:out(Q), + {R, maybe_clear_alarm(MQ#mqueue{q = Q2, len = Len - 1})}; +out(MQ = #mqueue{type = priority, q = Q}) -> + {R, Q2} = priority_queue:out(Q), + {R, MQ#mqueue{q = Q2}}. maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_wm = HighWM, alarm_fun = AlarmFun}) when Len > HighWM -> - Alarm = #mqtt_alarm{id = list_to_binary(["queue_high_watermark.", Name]), + Alarm = #mqtt_alarm{id = iolist_to_binary(["queue_high_watermark.", Name]), severity = warning, title = io_lib:format("Queue ~s high-water mark", [Name]), summary = io_lib:format("queue len ~p > high_watermark ~p", [Len, HighWM])}, diff --git a/src/emqttd_net.erl b/src/emqttd_net.erl index 1242ae9ee..9257c92ac 100644 --- a/src/emqttd_net.erl +++ b/src/emqttd_net.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_opts.erl b/src/emqttd_opts.erl index 147791cdd..4e4a75e66 100644 --- a/src/emqttd_opts.erl +++ b/src/emqttd_opts.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_packet.erl b/src/emqttd_packet.erl index 81c2ffd35..461696e69 100644 --- a/src/emqttd_packet.erl +++ b/src/emqttd_packet.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_parser.erl b/src/emqttd_parser.erl index d9bf6d806..b9eddc8c6 100644 --- a/src/emqttd_parser.erl +++ b/src/emqttd_parser.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_plugins.erl b/src/emqttd_plugins.erl index 517be8564..df008cc7f 100644 --- a/src/emqttd_plugins.erl +++ b/src/emqttd_plugins.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_pool_sup.erl b/src/emqttd_pool_sup.erl index 70a70a967..8884e97c1 100644 --- a/src/emqttd_pool_sup.erl +++ b/src/emqttd_pool_sup.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_pooler.erl b/src/emqttd_pooler.erl index 909690412..80fd400aa 100644 --- a/src/emqttd_pooler.erl +++ b/src/emqttd_pooler.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal @@ -53,10 +53,7 @@ start_link() -> %%%============================================================================= -spec start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, any()}. start_link(Pool, Id) -> - gen_server:start_link({local, name(Id)}, ?MODULE, [Pool, Id], []). - -name(Id) -> - list_to_atom(lists:concat([?MODULE, "_", integer_to_list(Id)])). + gen_server:start_link({local, emqttd:reg_name(?MODULE, Id)}, ?MODULE, [Pool, Id], []). %%------------------------------------------------------------------------------ %% @doc Submit work to pooler diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index be86739e8..96acfcba7 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index 1d528402f..5ef0b915f 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal @@ -19,7 +19,7 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc emqttd pubsub +%%% @doc PubSub %%% %%% @author Feng Lee %%%----------------------------------------------------------------------------- @@ -43,9 +43,7 @@ -export([start_link/4]). -export([create/2, lookup/2, subscribe/1, subscribe/2, - unsubscribe/1, unsubscribe/2, publish/1, delete/2]). - -%% Subscriptions API + publish/1, unsubscribe/1, unsubscribe/2, delete/2]). %% Local node -export([match/1]). @@ -62,8 +60,6 @@ -define(ROUTER, emqttd_router). --define(HELPER, emqttd_pubsub_helper). - %%%============================================================================= %%% Mnesia callbacks %%%============================================================================= @@ -123,26 +119,19 @@ cache_env(Key) -> %%% API %%%============================================================================= -%%------------------------------------------------------------------------------ %% @doc Start one pubsub server -%% @end -%%------------------------------------------------------------------------------ -spec start_link(Pool, Id, StatsFun, Opts) -> {ok, pid()} | ignore | {error, any()} when Pool :: atom(), Id :: pos_integer(), - StatsFun :: fun(), + StatsFun :: fun((atom()) -> any()), Opts :: list(tuple()). start_link(Pool, Id, StatsFun, Opts) -> - gen_server2:start_link({local, name(Id)}, ?MODULE, [Pool, Id, StatsFun, Opts], []). + gen_server2:start_link({local, emqttd:reg_name(?MODULE, Id)}, + ?MODULE, [Pool, Id, StatsFun, Opts], []). -name(Id) -> - list_to_atom("emqttd_pubsub_" ++ integer_to_list(Id)). - -%%------------------------------------------------------------------------------ %% @doc Create Topic or Subscription. -%% @end -%%------------------------------------------------------------------------------ --spec create(topic | subscription, binary() | {binary(), binary(), mqtt_qos()}) -> ok | {error, any()}. +-spec create(topic, emqttd_topic:topic()) -> ok | {error, any()}; + (subscription, {binary(), binary(), mqtt_qos()}) -> ok | {error, any()}. create(topic, Topic) when is_binary(Topic) -> Record = #mqtt_topic{topic = Topic, node = node()}, case mnesia:transaction(fun add_topic/1, [Record]) of @@ -151,39 +140,33 @@ create(topic, Topic) when is_binary(Topic) -> end; create(subscription, {SubId, Topic, Qos}) when is_binary(SubId) andalso is_binary(Topic) -> - case mnesia:transaction(fun add_subscription/2, [SubId, {Topic, Qos}]) of + case mnesia:transaction(fun add_subscription/2, [SubId, {Topic, ?QOS_I(Qos)}]) of {atomic, ok} -> ok; {aborted, Error} -> {error, Error} end. -%%------------------------------------------------------------------------------ %% @doc Lookup Topic or Subscription. -%% @end -%%------------------------------------------------------------------------------ --spec lookup(topic | subscription, binary()) -> list(). -lookup(topic, Topic) -> +-spec lookup(topic, emqttd_topic:topic()) -> list(mqtt_topic()); + (subscription, binary()) -> list(mqtt_subscription()). +lookup(topic, Topic) when is_binary(Topic) -> mnesia:dirty_read(topic, Topic); -lookup(subscription, ClientId) -> - mnesia:dirty_read(subscription, ClientId). +lookup(subscription, SubId) when is_binary(SubId) -> + mnesia:dirty_read(subscription, SubId). -%%------------------------------------------------------------------------------ %% @doc Delete Topic or Subscription. -%% @end -%%------------------------------------------------------------------------------ +-spec delete(topic, emqttd_topic:topic()) -> ok | {error, any()}; + (subscription, binary() | {binary(), emqttd_topic:topic()}) -> ok. delete(topic, _Topic) -> {error, unsupported}; -delete(subscription, ClientId) when is_binary(ClientId) -> - mnesia:dirty_delete({subscription, ClientId}); +delete(subscription, SubId) when is_binary(SubId) -> + mnesia:dirty_delete({subscription, SubId}); -delete(subscription, {ClientId, Topic}) when is_binary(ClientId) -> - mnesia:async_dirty(fun remove_subscriptions/2, [ClientId, [Topic]]). +delete(subscription, {SubId, Topic}) when is_binary(SubId) andalso is_binary(Topic) -> + mnesia:async_dirty(fun remove_subscriptions/2, [SubId, [Topic]]). -%%------------------------------------------------------------------------------ %% @doc Subscribe Topics -%% @end -%%------------------------------------------------------------------------------ -spec subscribe({Topic, Qos} | list({Topic, Qos})) -> {ok, Qos | list(Qos)} | {error, any()} when Topic :: binary(), @@ -206,34 +189,28 @@ subscribe(ClientId, TopicTable) when is_binary(ClientId) andalso is_list(TopicTa fixqos(TopicTable) -> [{Topic, ?QOS_I(Qos)} || {Topic, Qos} <- TopicTable]. -call(Request) -> - PubSub = gproc_pool:pick_worker(pubsub, self()), - gen_server2:call(PubSub, Request, infinity). - -%%------------------------------------------------------------------------------ %% @doc Unsubscribe Topic or Topics -%% @end -%%------------------------------------------------------------------------------ --spec unsubscribe(binary() | list(binary())) -> ok. +-spec unsubscribe(emqttd_topic:topic() | list(emqttd_topic:topic())) -> ok. unsubscribe(Topic) when is_binary(Topic) -> unsubscribe([Topic]); unsubscribe(Topics = [Topic|_]) when is_binary(Topic) -> cast({unsubscribe, {undefined, self()}, Topics}). --spec unsubscribe(binary(), binary() | list(binary())) -> ok. +-spec unsubscribe(binary(), emqttd_topic:topic() | list(emqttd_topic:topic())) -> ok. unsubscribe(ClientId, Topic) when is_binary(ClientId) andalso is_binary(Topic) -> unsubscribe(ClientId, [Topic]); unsubscribe(ClientId, Topics = [Topic|_]) when is_binary(Topic) -> cast({unsubscribe, {ClientId, self()}, Topics}). -cast(Msg) -> - PubSub = gproc_pool:pick_worker(pubsub, self()), - gen_server2:cast(PubSub, Msg). +call(Request) -> + gen_server2:call(pick(self()), Request, infinity). + +cast(Msg) -> + gen_server2:cast(pick(self()), Msg). + +pick(Self) -> gproc_pool:pick_worker(pubsub, Self). -%%------------------------------------------------------------------------------ %% @doc Publish to cluster nodes -%% @end -%%------------------------------------------------------------------------------ -spec publish(Msg :: mqtt_message()) -> ok. publish(Msg = #mqtt_message{from = From}) -> trace(publish, From, Msg), @@ -257,35 +234,41 @@ publish(To, Msg) -> end end, match(To)). -%%------------------------------------------------------------------------------ %% @doc Match Topic Name with Topic Filters -%% @end -%%------------------------------------------------------------------------------ --spec match(binary()) -> [mqtt_topic()]. +-spec match(emqttd_topic:topic()) -> [mqtt_topic()]. match(To) -> MatchedTopics = mnesia:async_dirty(fun emqttd_trie:match/1, [To]), - %% ets:lookup for topic table will be replicated. + %% ets:lookup for topic table will be replicated to all nodes. lists:append([ets:lookup(topic, Topic) || Topic <- MatchedTopics]). %%%============================================================================= %%% gen_server callbacks %%%============================================================================= -init([Pool, Id, StatsFun, Opts]) -> - ?ROUTER:init(Opts), +init([Pool, Id, StatsFun, _Opts]) -> ?GPROC_POOL(join, Pool, Id), {ok, #state{pool = Pool, id = Id, statsfun = StatsFun}}. handle_call({subscribe, {SubId, SubPid}, TopicTable}, _From, State = #state{statsfun = StatsFun}) -> + %% Monitor SubPid first + try_monitor(SubPid), + + %% Topics Topics = [Topic || {Topic, _Qos} <- TopicTable], - %% Add routes first - ?ROUTER:add_routes(Topics, SubPid), + NewTopics = Topics -- reverse_routes(SubPid), - %% Insert topic records to global topic table - Records = [#mqtt_topic{topic = Topic, node = node()} || Topic <- Topics], + %% Add routes + ?ROUTER:add_routes(NewTopics, SubPid), + + insert_reverse_routes(SubPid, NewTopics), + + StatsFun(reverse_route), + + %% Insert topic records to mnesia + Records = [#mqtt_topic{topic = Topic, node = node()} || Topic <- NewTopics], case mnesia:transaction(fun add_topics/1, [Records]) of {atomic, _} -> @@ -307,9 +290,14 @@ handle_call(Req, _From, State) -> ?UNEXPECTED_REQ(Req, State). handle_cast({unsubscribe, {SubId, SubPid}, Topics}, State = #state{statsfun = StatsFun}) -> + %% Delete routes first ?ROUTER:delete_routes(Topics, SubPid), + delete_reverse_routes(SubPid, Topics), + + StatsFun(reverse_route), + %% Remove subscriptions if_subscription( fun(_) -> @@ -317,19 +305,21 @@ handle_cast({unsubscribe, {SubId, SubPid}, Topics}, State = #state{statsfun = St emqttd_pooler:async_submit({mnesia, async_dirty, Args}), StatsFun(subscription) end), + {noreply, State}; handle_cast(Msg, State) -> ?UNEXPECTED_MSG(Msg, State). -handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) -> +handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State = #state{statsfun = StatsFun}) -> - Routes = ?ROUTER:lookup_routes(DownPid), + Topics = reverse_routes(DownPid), - %% Delete all routes of the process - ?ROUTER:delete_routes(DownPid), + ?ROUTER:delete_routes(Topics, DownPid), - ?HELPER:aging([Topic || Topic <- Routes, not ?ROUTER:has_route(Topic)]), + delete_reverse_routes(DownPid), + + StatsFun(reverse_route), {noreply, State, hibernate}; @@ -395,6 +385,31 @@ remove_subscriptions(SubId, Topics) -> delete_subscription(Record) -> mnesia:delete_object(subscription, Record, write). +reverse_routes(SubPid) -> + case ets:member(reverse_route, SubPid) of + true -> + try ets:lookup_element(reverse_route, SubPid, 2) catch error:badarg -> [] end; + false -> + [] + end. + +insert_reverse_routes(SubPid, Topics) -> + ets:insert(reverse_route, [{SubPid, Topic} || Topic <- Topics]). + +delete_reverse_routes(SubPid, Topics) -> + lists:foreach(fun(Topic) -> + ets:delete_object(reverse_route, {SubPid, Topic}) + end, Topics). + +delete_reverse_routes(SubPid) -> + ets:delete(reverse_route, SubPid). + +try_monitor(SubPid) -> + case ets:member(reverse_route, SubPid) of + true -> ignore; + false -> erlang:monitor(process, SubPid) + end. + %%%============================================================================= %%% Trace Functions %%%============================================================================= diff --git a/src/emqttd_pubsub_helper.erl b/src/emqttd_pubsub_helper.erl index 4ae8452fb..820280445 100644 --- a/src/emqttd_pubsub_helper.erl +++ b/src/emqttd_pubsub_helper.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal @@ -19,176 +19,66 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc PubSub Route Aging Helper +%%% @doc PubSub Helper. %%% %%% @author Feng Lee %%%----------------------------------------------------------------------------- -module(emqttd_pubsub_helper). --behaviour(gen_server2). +-behaviour(gen_server). -include("emqttd.hrl"). -include("emqttd_internal.hrl"). %% API Function Exports --export([start_link/2, aging/1]). +-export([start_link/1]). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --ifdef(TEST). --compile(export_all). --endif. - --record(aging, {topics, time, tref}). - --record(state, {aging :: #aging{}, statsfun}). +-record(state, {statsfun}). -define(SERVER, ?MODULE). --define(ROUTER, emqttd_router). - -%%%============================================================================= -%%% API -%%%============================================================================= - -%%------------------------------------------------------------------------------ -%% @doc Start pubsub helper. -%% @end -%%------------------------------------------------------------------------------ --spec start_link(fun(), list(tuple())) -> {ok, pid()} | ignore | {error, any()}. -start_link(StatsFun, Opts) -> - gen_server2:start_link({local, ?SERVER}, ?MODULE, [StatsFun, Opts], []). - -%%------------------------------------------------------------------------------ -%% @doc Aging topics -%% @end -%%------------------------------------------------------------------------------ --spec aging(list(binary())) -> ok. -aging(Topics) -> - gen_server2:cast(?SERVER, {aging, Topics}). +%% @doc Start PubSub Helper. +-spec start_link(fun()) -> {ok, pid()} | ignore | {error, any()}. +start_link(StatsFun) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [StatsFun], []). %%%============================================================================= %%% gen_server callbacks %%%============================================================================= -init([StatsFun, Opts]) -> +init([StatsFun]) -> mnesia:subscribe(system), - - AgingSecs = proplists:get_value(route_aging, Opts, 5), - - %% Aging Timer - {ok, AgingTref} = start_tick(AgingSecs div 2), - - {ok, #state{aging = #aging{topics = dict:new(), - time = AgingSecs, - tref = AgingTref}, - statsfun = StatsFun}}. - -start_tick(Secs) -> - timer:send_interval(timer:seconds(Secs), {clean, aged}). + {ok, #state{statsfun = StatsFun}}. handle_call(Req, _From, State) -> ?UNEXPECTED_REQ(Req, State). -handle_cast({aging, Topics}, State = #state{aging = Aging}) -> - #aging{topics = Dict} = Aging, - TS = emqttd_util:now_to_secs(), - Dict1 = - lists:foldl(fun(Topic, Acc) -> - case dict:find(Topic, Acc) of - {ok, _} -> Acc; - error -> dict:store(Topic, TS, Acc) - end - end, Dict, Topics), - {noreply, State#state{aging = Aging#aging{topics = Dict1}}}; - handle_cast(Msg, State) -> ?UNEXPECTED_MSG(Msg, State). -handle_info({clean, aged}, State = #state{aging = Aging}) -> - - #aging{topics = Dict, time = Time} = Aging, - - ByTime = emqttd_util:now_to_secs() - Time, - - Dict1 = try_clean(ByTime, dict:to_list(Dict)), - - NewAging = Aging#aging{topics = dict:from_list(Dict1)}, - - noreply(State#state{aging = NewAging}); - handle_info({mnesia_system_event, {mnesia_down, Node}}, State) -> - %% mnesia master? + %% TODO: mnesia master? Pattern = #mqtt_topic{_ = '_', node = Node}, F = fun() -> [mnesia:delete_object(topic, R, write) || R <- mnesia:match_object(topic, Pattern, write)] end, - mnesia:async_dirty(F), - noreply(State); + mnesia:transaction(F), noreply(State); handle_info(Info, State) -> ?UNEXPECTED_INFO(Info, State). -terminate(_Reason, #state{aging = #aging{tref = TRef}}) -> - timer:cancel(TRef). +terminate(_Reason, _State) -> + mnesia:unsubscribe(system). code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%%============================================================================= -%%% Internal Functions -%%%============================================================================= - noreply(State = #state{statsfun = StatsFun}) -> - StatsFun(topic), - {noreply, State, hibernate}. - -try_clean(ByTime, List) -> - try_clean(ByTime, List, []). - -try_clean(_ByTime, [], Acc) -> - Acc; - -try_clean(ByTime, [{Topic, TS} | Left], Acc) -> - case ?ROUTER:has_route(Topic) of - false -> - try_clean2(ByTime, {Topic, TS}, Left, Acc); - true -> - try_clean(ByTime, Left, Acc) - end. - -try_clean2(ByTime, {Topic, TS}, Left, Acc) when TS > ByTime -> - try_clean(ByTime, Left, [{Topic, TS}|Acc]); - -try_clean2(ByTime, {Topic, _TS}, Left, Acc) -> - TopicR = #mqtt_topic{topic = Topic, node = node()}, - mnesia:transaction(fun try_remove_topic/1, [TopicR]), - try_clean(ByTime, Left, Acc). - -try_remove_topic(TopicR = #mqtt_topic{topic = Topic}) -> - %% Lock topic first - case mnesia:wread({topic, Topic}) of - [] -> ok; - [TopicR] -> - if_no_route(Topic, fun() -> - %% Remove topic and trie - mnesia:delete_object(topic, TopicR, write), - emqttd_trie:delete(Topic) - end); - _More -> - if_no_route(Topic, fun() -> - %% Remove topic - mnesia:delete_object(topic, TopicR, write) - end) - end. - -if_no_route(Topic, Fun) -> - case ?ROUTER:has_route(Topic) of - true -> ok; - false -> Fun() - end. + StatsFun(topic), {noreply, State}. diff --git a/src/emqttd_pubsub_sup.erl b/src/emqttd_pubsub_sup.erl index 63d6efd6e..592b8a7d8 100644 --- a/src/emqttd_pubsub_sup.erl +++ b/src/emqttd_pubsub_sup.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal @@ -19,7 +19,7 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc PubSub Supervisor +%%% @doc PubSub Supervisor. %%% %%% @author Feng Lee %%%----------------------------------------------------------------------------- @@ -31,8 +31,10 @@ -define(HELPER, emqttd_pubsub_helper). +-define(CONCURRENCY_OPTS, [{read_concurrency, true}, {write_concurrency, true}]). + %% API --export([start_link/0]). +-export([start_link/0, pubsub_pool/0]). %% Supervisor callbacks -export([init/1]). @@ -40,24 +42,57 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, [emqttd_broker:env(pubsub)]). -init([Opts]) -> +pubsub_pool() -> + hd([Pid|| {pubsub_pool, Pid, _, _} <- supervisor:which_children(?MODULE)]). + +init([Env]) -> + %% Create tabs + create_tab(route), create_tab(reverse_route), + %% PubSub Helper - Helper = {helper, {?HELPER, start_link, [fun stats/1, Opts]}, + Helper = {helper, {?HELPER, start_link, [fun setstats/1]}, permanent, infinity, worker, [?HELPER]}, + %% Router Pool Sup + RouterMFA = {emqttd_router, start_link, [fun setstats/1, Env]}, + %% Pool_size / 2 + RouterSup = emqttd_pool_sup:spec(router_pool, [router, hash, pool_size(Env) div 2, RouterMFA]), + %% PubSub Pool Sup - MFA = {emqttd_pubsub, start_link, [fun stats/1, Opts]}, - PoolSup = emqttd_pool_sup:spec([pubsub, hash, pool_size(Opts), MFA]), - {ok, {{one_for_all, 10, 60}, [Helper, PoolSup]}}. + PubSubMFA = {emqttd_pubsub, start_link, [fun setstats/1, Env]}, + PubSubSup = emqttd_pool_sup:spec(pubsub_pool, [pubsub, hash, pool_size(Env), PubSubMFA]), -pool_size(Opts) -> + {ok, {{one_for_all, 10, 60}, [Helper, RouterSup, PubSubSup]}}. + +create_tab(route) -> + %% Route Table: Topic -> Pid1, Pid2, ..., PidN + %% duplicate_bag: o(1) insert + ensure_tab(route, [public, named_table, duplicate_bag | ?CONCURRENCY_OPTS]); + +create_tab(reverse_route) -> + %% Reverse Route Table: Pid -> Topic1, Topic2, ..., TopicN + ensure_tab(reverse_route, [public, named_table, bag | ?CONCURRENCY_OPTS]). + +ensure_tab(Tab, Opts) -> + case ets:info(Tab, name) of + undefined -> ets:new(Tab, Opts); + _ -> ok + end. + +pool_size(Env) -> Schedulers = erlang:system_info(schedulers), - proplists:get_value(pool_size, Opts, Schedulers). + proplists:get_value(pool_size, Env, Schedulers). -stats(topic) -> - emqttd_stats:setstats('topics/count', 'topics/max', - mnesia:table_info(topic, size)); -stats(subscription) -> +setstats(route) -> + emqttd_stats:setstat('routes/count', ets:info(route, size)); + +setstats(reverse_route) -> + emqttd_stats:setstat('routes/reverse', ets:info(reverse_route, size)); + +setstats(topic) -> + emqttd_stats:setstats('topics/count', 'topics/max', mnesia:table_info(topic, size)); + +setstats(subscription) -> emqttd_stats:setstats('subscriptions/count', 'subscriptions/max', mnesia:table_info(subscription, size)). diff --git a/src/emqttd_retainer.erl b/src/emqttd_retainer.erl index 00523f259..5d21ffa29 100644 --- a/src/emqttd_retainer.erl +++ b/src/emqttd_retainer.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_router.erl b/src/emqttd_router.erl index 9f10f3b58..183a4b5a0 100644 --- a/src/emqttd_router.erl +++ b/src/emqttd_router.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal @@ -19,167 +19,267 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc MQTT Message Router on Local Node -%%% -%%% Route Table: -%%% -%%% Topic -> Pid1, Pid2, ... -%%% -%%% Reverse Route Table: -%%% -%%% Pid -> Topic1, Topic2, ... -%%% -%%% @end +%%% @doc Message Router on local node. %%% %%% @author Feng Lee %%%----------------------------------------------------------------------------- + -module(emqttd_router). +-behaviour(gen_server2). + -include("emqttd.hrl"). -include("emqttd_protocol.hrl"). --export([init/1, route/2, lookup_routes/1, has_route/1, - add_routes/2, delete_routes/1, delete_routes/2]). +-include("emqttd_internal.hrl"). --ifdef(TEST). --compile(export_all). --endif. +-export([start_link/4]). -%%------------------------------------------------------------------------------ -%% @doc Create route tables. -%% @end -%%------------------------------------------------------------------------------ -init(_Opts) -> - TabOpts = [bag, public, named_table, - {write_concurrency, true}], - %% Route Table: Topic -> {Pid, QoS} - %% Route Shard: {Topic, Shard} -> {Pid, QoS} - ensure_tab(route, TabOpts), +%% Route API +-export([route/2]). - %% Reverse Route Table: Pid -> {Topic, QoS} - ensure_tab(reverse_route, TabOpts). +%% Route Admin API +-export([add_route/2, lookup_routes/1, has_route/1, delete_route/2]). -ensure_tab(Tab, Opts) -> - case ets:info(Tab, name) of - undefined -> - ets:new(Tab, Opts); - _ -> - ok - end. +%% Batch API +-export([add_routes/2, delete_routes/2]). --ifdef(TEST). -destory() -> - ets:delete(route), - ets:delete(reverse_route). --endif. +%% For Test +-export([stop/1]). -%%------------------------------------------------------------------------------ -%% @doc Add Routes. -%% @end -%%------------------------------------------------------------------------------ --spec add_routes(list(binary()), pid()) -> ok. -add_routes(Topics, Pid) when is_pid(Pid) -> - with_stats(fun() -> - case lookup_routes(Pid) of - [] -> - erlang:monitor(process, Pid), - insert_routes(Topics, Pid); - InEts -> - insert_routes(Topics -- InEts, Pid) - end - end). +%% gen_server Callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). -%%------------------------------------------------------------------------------ -%% @doc Lookup Routes -%% @end -%%------------------------------------------------------------------------------ --spec lookup_routes(pid()) -> list(binary()). -lookup_routes(Pid) when is_pid(Pid) -> - [Topic || {_, Topic} <- ets:lookup(reverse_route, Pid)]. +-record(aging, {topics, time, tref}). -%%------------------------------------------------------------------------------ -%% @doc Has Route? -%% @end -%%------------------------------------------------------------------------------ --spec has_route(binary()) -> boolean(). -has_route(Topic) -> - ets:member(route, Topic). +-record(state, {pool, id, statsfun, aging :: #aging{}}). -%%------------------------------------------------------------------------------ -%% @doc Delete Routes -%% @end -%%------------------------------------------------------------------------------ --spec delete_routes(list(binary()), pid()) -> ok. -delete_routes(Topics, Pid) -> - with_stats(fun() -> - Routes = [{Topic, Pid} || Topic <- Topics], - lists:foreach(fun delete_route/1, Routes) - end). +%% @doc Start a local router. +-spec start_link(atom(), pos_integer(), fun((atom()) -> ok), list()) -> {ok, pid()} | {error, any()}. +start_link(Pool, Id, StatsFun, Env) -> + gen_server2:start_link({local, emqttd:reg_name(?MODULE,Id)}, + ?MODULE, [Pool, Id, StatsFun, Env], []). --spec delete_routes(pid()) -> ok. -delete_routes(Pid) when is_pid(Pid) -> - with_stats(fun() -> - Routes = [{Topic, Pid} || Topic <- lookup_routes(Pid)], - ets:delete(reverse_route, Pid), - lists:foreach(fun delete_route_only/1, Routes) - end). - -%%------------------------------------------------------------------------------ -%% @doc Route Message on Local Node. -%% @end -%%------------------------------------------------------------------------------ --spec route(binary(), mqtt_message()) -> non_neg_integer(). +%% @doc Route Message on the local node. +-spec route(emqttd_topic:topic(), mqtt_message()) -> any(). route(Queue = <<"$Q/", _Q>>, Msg) -> - case ets:lookup(route, Queue) of + case lookup_routes(Queue) of [] -> emqttd_metrics:inc('messages/dropped'); + [SubPid] -> + SubPid ! {dispatch, Queue, Msg}; Routes -> Idx = crypto:rand_uniform(1, length(Routes) + 1), - {_, SubPid} = lists:nth(Idx, Routes), - dispatch(SubPid, Queue, Msg) + SubPid = lists:nth(Idx, Routes), + SubPid ! {dispatch, Queue, Msg} end; route(Topic, Msg) -> - case ets:lookup(route, Topic) of + case lookup_routes(Topic) of [] -> emqttd_metrics:inc('messages/dropped'); + [SubPid] -> %% optimize + SubPid ! {dispatch, Topic, Msg}; Routes -> - lists:foreach(fun({_Topic, SubPid}) -> - dispatch(SubPid, Topic, Msg) - end, Routes) + lists:foreach(fun(SubPid) -> + SubPid ! {dispatch, Topic, Msg} + end, Routes) end. -dispatch(SubPid, Topic, Msg) -> SubPid ! {dispatch, Topic, Msg}. +%% @doc Has Route? +-spec has_route(emqttd_topic:topic()) -> boolean(). +has_route(Topic) -> + ets:member(route, Topic). -%%%============================================================================= -%%% Internal Functions -%%%============================================================================= +%% @doc Lookup Routes +-spec lookup_routes(emqttd_topic:topic()) -> list(pid()). +lookup_routes(Topic) when is_binary(Topic) -> + case ets:member(route, Topic) of + true -> + try ets:lookup_element(route, Topic, 2) catch error:badarg -> [] end; + false -> + [] + end. -insert_routes([], _Pid) -> +%% @doc Add Route. +-spec add_route(emqttd_topic:topic(), pid()) -> ok. +add_route(Topic, Pid) when is_pid(Pid) -> + call(pick(Topic), {add_route, Topic, Pid}). + +%% @doc Add Routes. +-spec add_routes(list(emqttd_topic:topic()), pid()) -> ok. +add_routes([], _Pid) -> ok; -insert_routes(Topics, Pid) -> - {Routes, ReverseRoutes} = routes(Topics, Pid), - ets:insert(route, Routes), - ets:insert(reverse_route, ReverseRoutes). +add_routes([Topic], Pid) -> + add_route(Topic, Pid); -routes(Topics, Pid) -> - lists:unzip([{{Topic, Pid}, {Pid, Topic}} || Topic <- Topics]). +add_routes(Topics, Pid) -> + lists:foreach(fun({Router, Slice}) -> + call(Router, {add_routes, Slice, Pid}) + end, slice(Topics)). -delete_route({Topic, Pid}) -> - ets:delete_object(reverse_route, {Pid, Topic}), - ets:delete_object(route, {Topic, Pid}). +%% @doc Delete Route. +-spec delete_route(emqttd_topic:topic(), pid()) -> ok. +delete_route(Topic, Pid) -> + cast(pick(Topic), {delete_route, Topic, Pid}). -delete_route_only({Topic, Pid}) -> - ets:delete_object(route, {Topic, Pid}). +%% @doc Delete Routes. +-spec delete_routes(list(emqttd_topic:topic()), pid()) -> ok. +delete_routes([Topic], Pid) -> + delete_route(Topic, Pid); -with_stats(Fun) -> - Ok = Fun(), setstats(), Ok. +delete_routes(Topics, Pid) -> + lists:foreach(fun({Router, Slice}) -> + cast(Router, {delete_routes, Slice, Pid}) + end, slice(Topics)). -setstats() -> - lists:foreach(fun setstat/1, [{route, 'routes/count'}, - {reverse_route, 'routes/reverse'}]). +%% @private Slice topics. +slice(Topics) -> + dict:to_list(lists:foldl(fun(Topic, Dict) -> + dict:append(pick(Topic), Topic, Dict) + end, dict:new(), Topics)). -setstat({Tab, Stat}) -> - emqttd_stats:setstat(Stat, ets:info(Tab, size)). +%% @private Pick a router. +pick(Topic) -> + gproc_pool:pick_worker(router, Topic). + +stop(Id) when is_integer(Id) -> + gen_server2:call(emqttd:reg_name(?MODULE, Id), stop). + +call(Router, Request) -> + gen_server2:call(Router, Request, infinity). + +cast(Router, Msg) -> + gen_server2:cast(Router, Msg). + +init([Pool, Id, StatsFun, Opts]) -> + + %% Calls from pubsub should be scheduled first? + process_flag(priority, high), + + ?GPROC_POOL(join, Pool, Id), + + emqttd:seed_now(), + + AgingSecs = proplists:get_value(route_aging, Opts, 5), + + %% Aging Timer + {ok, AgingTref} = start_tick(AgingSecs + random:uniform(AgingSecs)), + + Aging = #aging{topics = dict:new(), time = AgingSecs, tref = AgingTref}, + + {ok, #state{pool = Pool, id = Id, statsfun = StatsFun, aging = Aging}}. + +start_tick(Secs) -> + timer:send_interval(timer:seconds(Secs), {clean, aged}). + +handle_call(stop, _From, State) -> + {stop, normal, ok, State}; + +handle_call({add_route, Topic, Pid}, _From, State) -> + ets:insert(route, {Topic, Pid}), + {reply, ok, setstats(State)}; + +handle_call({add_routes, Topics, Pid}, _From, State) -> + ets:insert(route, [{Topic, Pid} || Topic <- Topics]), + {reply, ok, setstats(State)}; + +handle_call(Req, _From, State) -> + ?UNEXPECTED_REQ(Req, State). + +handle_cast({delete_route, Topic, Pid}, State = #state{aging = Aging}) -> + ets:delete_object(route, {Topic, Pid}), + NewState = + case has_route(Topic) of + false -> State#state{aging = store_aged(Topic, Aging)}; + true -> State + end, + {noreply, setstats(NewState)}; + +handle_cast({delete_routes, Topics, Pid}, State) -> + NewAging = + lists:foldl(fun(Topic, Aging) -> + ets:delete_object(route, {Topic, Pid}), + case has_route(Topic) of + false -> store_aged(Topic, Aging); + true -> Aging + end + end, State#state.aging, Topics), + {noreply, setstats(State#state{aging = NewAging})}; + +handle_cast(Msg, State) -> + ?UNEXPECTED_MSG(Msg, State). + +handle_info({clean, aged}, State = #state{aging = Aging}) -> + + #aging{topics = Dict, time = Time} = Aging, + + ByTime = emqttd_util:now_to_secs() - Time, + + Dict1 = try_clean(ByTime, dict:to_list(Dict)), + + NewAging = Aging#aging{topics = dict:from_list(Dict1)}, + + {noreply, State#state{aging = NewAging}, hibernate}; + +handle_info(Info, State) -> + ?UNEXPECTED_INFO(Info, State). + +terminate(_Reason, #state{pool = Pool, id = Id, aging = #aging{tref = TRef}}) -> + timer:cancel(TRef), + ?GPROC_POOL(leave, Pool, Id). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +try_clean(ByTime, List) -> + try_clean(ByTime, List, []). + +try_clean(_ByTime, [], Acc) -> + Acc; + +try_clean(ByTime, [{Topic, TS} | Left], Acc) -> + case has_route(Topic) of + false -> + try_clean2(ByTime, {Topic, TS}, Left, Acc); + true -> + try_clean(ByTime, Left, Acc) + end. + +try_clean2(ByTime, {Topic, TS}, Left, Acc) when TS > ByTime -> + try_clean(ByTime, Left, [{Topic, TS} | Acc]); + +try_clean2(ByTime, {Topic, _TS}, Left, Acc) -> + TopicR = #mqtt_topic{topic = Topic, node = node()}, + case mnesia:transaction(fun try_remove_topic/1, [TopicR]) of + {atomic, _} -> ok; + {aborted, Error} -> lager:error("Clean Topic '~s' Error: ~p", [Topic, Error]) + end, + try_clean(ByTime, Left, Acc). + +try_remove_topic(TopicR = #mqtt_topic{topic = Topic}) -> + %% Lock topic first + case mnesia:wread({topic, Topic}) of + [] -> + ok; %% mnesia:abort(not_found); + [TopicR] -> + %% Remove topic and trie + delete_topic(TopicR), + emqttd_trie:delete(Topic); + _More -> + %% Remove topic only + delete_topic(TopicR) + end. + +delete_topic(TopicR) -> + mnesia:delete_object(topic, TopicR, write). + +store_aged(Topic, Aging = #aging{topics = Dict}) -> + Now = emqttd_util:now_to_secs(), + Aging#aging{topics = dict:store(Topic, Now, Dict)}. + +setstats(State = #state{statsfun = StatsFun}) -> + StatsFun(route), State. diff --git a/src/emqttd_serializer.erl b/src/emqttd_serializer.erl index 9589340b7..6cded9b47 100644 --- a/src/emqttd_serializer.erl +++ b/src/emqttd_serializer.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index af7fec0ca..f5f3d7579 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_session_sup.erl b/src/emqttd_session_sup.erl index 96d1da656..49a25546a 100644 --- a/src/emqttd_session_sup.erl +++ b/src/emqttd_session_sup.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index a9ff6659b..52cd5534b 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal @@ -85,10 +85,7 @@ mnesia(copy) -> %%------------------------------------------------------------------------------ -spec start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, any()}. start_link(Pool, Id) -> - gen_server2:start_link({local, name(Id)}, ?MODULE, [Pool, Id], []). - -name(Id) -> - list_to_atom("emqttd_sm_" ++ integer_to_list(Id)). + gen_server2:start_link({local, emqttd:reg_name(?MODULE, Id)}, ?MODULE, [Pool, Id], []). %%------------------------------------------------------------------------------ %% @doc Start a session diff --git a/src/emqttd_sm_helper.erl b/src/emqttd_sm_helper.erl index 4fcf8c967..c4baa04ad 100644 --- a/src/emqttd_sm_helper.erl +++ b/src/emqttd_sm_helper.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_sm_sup.erl b/src/emqttd_sm_sup.erl index 39c4eb5ee..83e426b0d 100644 --- a/src/emqttd_sm_sup.erl +++ b/src/emqttd_sm_sup.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_stats.erl b/src/emqttd_stats.erl index 97d9b8027..7a3f0be6f 100644 --- a/src/emqttd_stats.erl +++ b/src/emqttd_stats.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal @@ -143,7 +143,7 @@ setstats(Stat, MaxStat, Val) -> %%%============================================================================= init([]) -> - random:seed(os:timestamp()), + emqttd:seed_now(), ets:new(?STATS_TAB, [set, public, named_table, {write_concurrency, true}]), Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB ++ ?SYSTOP_RETAINED, ets:insert(?STATS_TAB, [{Topic, 0} || Topic <- Topics]), diff --git a/src/emqttd_sup.erl b/src/emqttd_sup.erl index 111dba0c4..b157454e1 100644 --- a/src/emqttd_sup.erl +++ b/src/emqttd_sup.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_sysmon.erl b/src/emqttd_sysmon.erl index bf18c31ca..4c6fcd0a1 100644 --- a/src/emqttd_sysmon.erl +++ b/src/emqttd_sysmon.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_sysmon_sup.erl b/src/emqttd_sysmon_sup.erl index 40649596d..6fcd3347e 100644 --- a/src/emqttd_sysmon_sup.erl +++ b/src/emqttd_sysmon_sup.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_topic.erl b/src/emqttd_topic.erl index a77f330cc..3dcae94b0 100644 --- a/src/emqttd_topic.erl +++ b/src/emqttd_topic.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal @@ -31,6 +31,8 @@ -export([join/1, feed_var/3, is_queue/1, systop/1]). +-type topic() :: binary(). + %-type type() :: static | dynamic. -type word() :: '' | '+' | '#' | binary(). @@ -39,7 +41,7 @@ -type triple() :: {root | binary(), word(), binary()}. --export_type([word/0, triple/0]). +-export_type([topic/0, word/0, triple/0]). -define(MAX_TOPIC_LEN, 4096). @@ -47,7 +49,7 @@ %% @doc Is wildcard topic? %% @end %%%----------------------------------------------------------------------------- --spec wildcard(binary()) -> true | false. +-spec wildcard(topic()) -> true | false. wildcard(Topic) when is_binary(Topic) -> wildcard(words(Topic)); wildcard([]) -> @@ -64,8 +66,8 @@ wildcard([_H|T]) -> %% @end %%------------------------------------------------------------------------------ -spec match(Name, Filter) -> boolean() when - Name :: binary() | words(), - Filter :: binary() | words(). + Name :: topic() | words(), + Filter :: topic() | words(). match(Name, Filter) when is_binary(Name) and is_binary(Filter) -> match(words(Name), words(Filter)); match([], []) -> @@ -91,7 +93,7 @@ match([], [_H|_T2]) -> %% @doc Validate Topic %% @end %%------------------------------------------------------------------------------ --spec validate({name | filter, binary()}) -> boolean(). +-spec validate({name | filter, topic()}) -> boolean(). validate({_, <<>>}) -> false; validate({_, Topic}) when is_binary(Topic) and (size(Topic) > ?MAX_TOPIC_LEN) -> @@ -129,7 +131,7 @@ validate3(<<_/utf8, Rest/binary>>) -> %% @doc Topic to Triples %% @end %%%----------------------------------------------------------------------------- --spec triples(binary()) -> list(triple()). +-spec triples(topic()) -> list(triple()). triples(Topic) when is_binary(Topic) -> triples(words(Topic), root, []). @@ -154,7 +156,7 @@ bin(B) when is_binary(B) -> B. %% @doc Split Topic Path to Words %% @end %%------------------------------------------------------------------------------ --spec words(binary()) -> words(). +-spec words(topic()) -> words(). words(Topic) when is_binary(Topic) -> [word(W) || W <- binary:split(Topic, <<"/">>, [global])]. @@ -167,7 +169,7 @@ word(Bin) -> Bin. %% @doc Queue is a special topic name that starts with "$Q/" %% @end %%------------------------------------------------------------------------------ --spec is_queue(binary()) -> boolean(). +-spec is_queue(topic()) -> boolean(). is_queue(<<"$Q/", _Queue/binary>>) -> true; is_queue(_) -> diff --git a/src/emqttd_trace.erl b/src/emqttd_trace.erl index 9ddeb8fb5..d3b8eec01 100644 --- a/src/emqttd_trace.erl +++ b/src/emqttd_trace.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_trace_sup.erl b/src/emqttd_trace_sup.erl index e5cfd4e78..6dc248201 100644 --- a/src/emqttd_trace_sup.erl +++ b/src/emqttd_trace_sup.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_trie.erl b/src/emqttd_trie.erl index a5f9ac9e5..13db64b24 100644 --- a/src/emqttd_trie.erl +++ b/src/emqttd_trie.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_util.erl b/src/emqttd_util.erl index fcb06d4c3..0cee8e0d4 100644 --- a/src/emqttd_util.erl +++ b/src/emqttd_util.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_vm.erl b/src/emqttd_vm.erl index 2b0e1b85f..182fcd804 100644 --- a/src/emqttd_vm.erl +++ b/src/emqttd_vm.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/emqttd_ws_client.erl b/src/emqttd_ws_client.erl index 8b06d9367..437fc2898 100644 --- a/src/emqttd_ws_client.erl +++ b/src/emqttd_ws_client.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/lager_emqtt_backend.erl b/src/lager_emqtt_backend.erl index 7bf35844c..f9d3424f7 100644 --- a/src/lager_emqtt_backend.erl +++ b/src/lager_emqtt_backend.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/src/priority_queue.erl b/src/priority_queue.erl index 88c69513d..ad92553be 100644 --- a/src/priority_queue.erl +++ b/src/priority_queue.erl @@ -37,11 +37,10 @@ %% calls into the same function knowing that ordinary queues represent %% a base case. - -module(priority_queue). --export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, from_list/1, - in/2, in/3, out/1, out_p/1, join/2, filter/2, fold/3, highest/1]). +-export([new/0, is_queue/1, is_empty/1, len/1, plen/2, to_list/1, from_list/1, + in/2, in/3, out/1, out/2, out_p/1, join/2, filter/2, fold/3, highest/1]). %%---------------------------------------------------------------------------- @@ -58,6 +57,7 @@ -spec(is_queue/1 :: (any()) -> boolean()). -spec(is_empty/1 :: (pqueue()) -> boolean()). -spec(len/1 :: (pqueue()) -> non_neg_integer()). +-spec(plen/2 :: (priority(), pqueue()) -> non_neg_integer()). -spec(to_list/1 :: (pqueue()) -> [{priority(), any()}]). -spec(from_list/1 :: ([{priority(), any()}]) -> pqueue()). -spec(in/2 :: (any(), pqueue()) -> pqueue()). @@ -96,6 +96,16 @@ len({queue, _R, _F, L}) -> len({pqueue, Queues}) -> lists:sum([len(Q) || {_, Q} <- Queues]). +plen(0, {queue, _R, _F, L}) -> + L; +plen(P, {queue, _R, _F, _}) -> + erlang:error(badarg, [P]); +plen(P, {pqueue, Queues}) -> + case lists:keysearch(maybe_negate_priority(P), 1, Queues) of + {value, {_, Q}} -> len(Q); + false -> 0 + end. + to_list({queue, In, Out, _Len}) when is_list(In), is_list(Out) -> [{0, V} || V <- Out ++ lists:reverse(In, [])]; to_list({pqueue, Queues}) -> @@ -159,6 +169,28 @@ out({pqueue, [{P, Q} | Queues]}) -> out_p({queue, _, _, _} = Q) -> add_p(out(Q), 0); out_p({pqueue, [{P, _} | _]} = Q) -> add_p(out(Q), maybe_negate_priority(P)). +out(0, {queue, _, _, _} = Q) -> + out(Q); +out(Priority, {queue, _, _, _}) -> + erlang:error(badarg, [Priority]); +out(Priority, {pqueue, Queues}) -> + P = maybe_negate_priority(Priority), + case lists:keysearch(P, 1, Queues) of + {value, {_, Q}} -> + {R, Q1} = out(Q), + Queues1 = case is_empty(Q1) of + true -> lists:keydelete(P, 1, Queues); + false -> lists:keyreplace(P, 1, Queues, {P, Q1}) + end, + {R, case Queues1 of + [] -> {queue, [], [], 0}; + [{0, OnlyQ}] -> OnlyQ; + [_|_] -> {pqueue, Queues1} + end}; + false -> + {empty, {pqueue, Queues}} + end. + add_p(R, P) -> case R of {empty, Q} -> {empty, Q}; {{value, V}, Q} -> {{value, V, P}, Q} diff --git a/test/emqttd_access_control_tests.erl b/test/emqttd_access_control_tests.erl index 5da45f4a8..e3030fa96 100644 --- a/test/emqttd_access_control_tests.erl +++ b/test/emqttd_access_control_tests.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% @Copyright (C) 2012-2015, Feng Lee +%%% @Copyright (C) 2012-2016, Feng Lee %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/test/emqttd_access_rule_tests.erl b/test/emqttd_access_rule_tests.erl index f46f23ce4..7308f572a 100644 --- a/test/emqttd_access_rule_tests.erl +++ b/test/emqttd_access_rule_tests.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% @Copyright (C) 2012-2015, Feng Lee +%%% @Copyright (C) 2012-2016, Feng Lee %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/test/emqttd_acl_test_mod.erl b/test/emqttd_acl_test_mod.erl index 45ad3e133..3991e3dde 100644 --- a/test/emqttd_acl_test_mod.erl +++ b/test/emqttd_acl_test_mod.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% @Copyright (C) 2012-2015, Feng Lee +%%% @Copyright (C) 2012-2016, Feng Lee %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/test/emqttd_auth_anonymous_test_mod.erl b/test/emqttd_auth_anonymous_test_mod.erl index a69d24630..010a72dad 100644 --- a/test/emqttd_auth_anonymous_test_mod.erl +++ b/test/emqttd_auth_anonymous_test_mod.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% @Copyright (C) 2012-2015, Feng Lee +%%% @Copyright (C) 2012-2016, Feng Lee %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/test/emqttd_guid_tests.erl b/test/emqttd_guid_tests.erl index 1ddf0610c..c53b5df79 100644 --- a/test/emqttd_guid_tests.erl +++ b/test/emqttd_guid_tests.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% @Copyright (C) 2012-2015, Feng Lee +%%% @Copyright (C) 2012-2016, Feng Lee %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/test/emqttd_keepalive_tests.erl b/test/emqttd_keepalive_tests.erl index 96f84450a..e4b0d63df 100644 --- a/test/emqttd_keepalive_tests.erl +++ b/test/emqttd_keepalive_tests.erl @@ -1,6 +1,6 @@ %%%----------------------------------------------------------------------------- -%%% @Copyright (C) 2012-2015, Feng Lee +%%% @Copyright (C) 2012-2016, Feng Lee %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/test/emqttd_mqueue_tests.erl b/test/emqttd_mqueue_tests.erl index 8e6c4fb16..6d20b3cbf 100644 --- a/test/emqttd_mqueue_tests.erl +++ b/test/emqttd_mqueue_tests.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% @Copyright (C) 2012-2015, Feng Lee +%%% @Copyright (C) 2012-2016, Feng Lee %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal @@ -24,7 +24,7 @@ -include("emqttd.hrl"). --define(QM, emqttd_mqueue). +-define(Q, emqttd_mqueue). -ifdef(TEST). @@ -33,39 +33,115 @@ in_test() -> Opts = [{max_length, 5}, {queue_qos0, true}], - Q = ?QM:new(<<"testQ">>, Opts, alarm_fun()), - ?assertEqual(true, ?QM:is_empty(Q)), - Q1 = ?QM:in(#mqtt_message{}, Q), - ?assertEqual(1, ?QM:len(Q1)), - Q2 = ?QM:in(#mqtt_message{qos = 1}, Q1), - ?assertEqual(2, ?QM:len(Q2)), - Q3 = ?QM:in(#mqtt_message{qos = 2}, Q2), - Q4 = ?QM:in(#mqtt_message{}, Q3), - Q5 = ?QM:in(#mqtt_message{}, Q4), - ?assertEqual(true, ?QM:is_full(Q5)). + Q = ?Q:new(<<"testQ">>, Opts, alarm_fun()), + ?assertEqual(true, ?Q:is_empty(Q)), + Q1 = ?Q:in(#mqtt_message{}, Q), + ?assertEqual(1, ?Q:len(Q1)), + Q2 = ?Q:in(#mqtt_message{qos = 1}, Q1), + ?assertEqual(2, ?Q:len(Q2)), + Q3 = ?Q:in(#mqtt_message{qos = 2}, Q2), + Q4 = ?Q:in(#mqtt_message{}, Q3), + Q5 = ?Q:in(#mqtt_message{}, Q4), + ?assertEqual(5, ?Q:len(Q5)). in_qos0_test() -> Opts = [{max_length, 5}, {queue_qos0, false}], - Q = ?QM:new(<<"testQ">>, Opts, alarm_fun()), - Q1 = ?QM:in(#mqtt_message{}, Q), - ?assertEqual(true, ?QM:is_empty(Q1)), - Q2 = ?QM:in(#mqtt_message{qos = 0}, Q1), - ?assertEqual(true, ?QM:is_empty(Q2)). + Q = ?Q:new(<<"testQ">>, Opts, alarm_fun()), + Q1 = ?Q:in(#mqtt_message{}, Q), + ?assertEqual(true, ?Q:is_empty(Q1)), + Q2 = ?Q:in(#mqtt_message{qos = 0}, Q1), + ?assertEqual(true, ?Q:is_empty(Q2)). out_test() -> Opts = [{max_length, 5}, {queue_qos0, true}], - Q = ?QM:new(<<"testQ">>, Opts, alarm_fun()), - ?assertMatch({empty, Q}, ?QM:out(Q)), - Q1 = ?QM:in(#mqtt_message{}, Q), - {Value, Q2} = ?QM:out(Q1), - ?assertEqual(0, ?QM:len(Q2)), + Q = ?Q:new(<<"testQ">>, Opts, alarm_fun()), + ?assertMatch({empty, Q}, ?Q:out(Q)), + Q1 = ?Q:in(#mqtt_message{}, Q), + {Value, Q2} = ?Q:out(Q1), + ?assertEqual(0, ?Q:len(Q2)), ?assertMatch({value, #mqtt_message{}}, Value). + +simple_mqueue_test() -> + Opts = [{type, simple}, + {max_length, 3}, + {low_watermark, 0.2}, + {high_watermark, 0.6}, + {queue_qos0, false}], + Q = ?Q:new("simple_queue", Opts, alarm_fun()), + ?assertEqual(simple, ?Q:type(Q)), + ?assertEqual(3, ?Q:max_len(Q)), + ?assertEqual(<<"simple_queue">>, ?Q:name(Q)), + ?assert(?Q:is_empty(Q)), + Q1 = ?Q:in(#mqtt_message{qos = 1, payload = <<"1">>}, Q), + Q2 = ?Q:in(#mqtt_message{qos = 1, payload = <<"2">>}, Q1), + Q3 = ?Q:in(#mqtt_message{qos = 1, payload = <<"3">>}, Q2), + Q4 = ?Q:in(#mqtt_message{qos = 1, payload = <<"4">>}, Q3), + ?assertEqual(3, ?Q:len(Q4)), + {{value, Msg}, Q5} = ?Q:out(Q4), + ?assertMatch(<<"2">>, Msg#mqtt_message.payload), + ?assertEqual([{len, 2}, {max_len, 3}, {dropped, 1}], ?Q:stats(Q5)). + +infinity_simple_mqueue_test() -> + Opts = [{type, simple}, + {max_length, infinity}, + {low_watermark, 0.2}, + {high_watermark, 0.6}, + {queue_qos0, false}], + Q = ?Q:new("infinity_simple_queue", Opts, alarm_fun()), + ?assert(?Q:is_empty(Q)), + ?assertEqual(infinity, ?Q:max_len(Q)), + Qx = lists:foldl(fun(I, AccQ) -> + ?Q:in(#mqtt_message{qos = 1, payload = iolist_to_binary([I])}, AccQ) + end, Q, lists:seq(1, 255)), + ?assertEqual(255, ?Q:len(Qx)), + ?assertEqual([{len, 255}, {max_len, infinity}, {dropped, 0}], ?Q:stats(Qx)), + {{value, V}, Qy} = ?Q:out(Qx), + ?assertEqual(<<1>>, V#mqtt_message.payload). + +priority_mqueue_test() -> + Opts = [{type, priority}, + {priority, [{<<"t">>, 10}]}, + {max_length, 3}, + {low_watermark, 0.2}, + {high_watermark, 0.6}, + {queue_qos0, false}], + Q = ?Q:new("priority_queue", Opts, alarm_fun()), + ?assertEqual(priority, ?Q:type(Q)), + ?assertEqual(3, ?Q:max_len(Q)), + ?assertEqual(<<"priority_queue">>, ?Q:name(Q)), + + ?assert(?Q:is_empty(Q)), + Q1 = ?Q:in(#mqtt_message{qos = 1, topic = <<"t1">>}, Q), + Q2 = ?Q:in(#mqtt_message{qos = 1, topic = <<"t">>}, Q1), + Q3 = ?Q:in(#mqtt_message{qos = 1, topic = <<"t2">>}, Q2), + ?assertEqual(3, ?Q:len(Q3)), + Q4 = ?Q:in(#mqtt_message{qos = 1, topic = <<"t1">>}, Q3), + ?assertEqual(4, ?Q:len(Q4)), + Q5 = ?Q:in(#mqtt_message{qos = 1, topic = <<"t1">>}, Q4), + ?assertEqual(5, ?Q:len(Q5)), + Q6 = ?Q:in(#mqtt_message{qos = 1, topic = <<"t1">>}, Q5), + ?assertEqual(5, ?Q:len(Q6)), + {{value, Msg}, Q7} = ?Q:out(Q6), + ?assertMatch(<<"t">>, Msg#mqtt_message.topic). + +infinity_priority_mqueue_test() -> + Opts = [{type, priority}, + {priority, [{<<"t1">>, 10}, {<<"t2">>, 8}]}, + {max_length, infinity}, + {queue_qos0, false}], + Q = ?Q:new("infinity_priority_queue", Opts, alarm_fun()), + ?assertEqual(infinity, ?Q:max_len(Q)), + Qx = lists:foldl(fun(I, AccQ) -> + AccQ1 = + ?Q:in(#mqtt_message{topic = <<"t1">>, qos = 1, payload = iolist_to_binary([I])}, AccQ), + ?Q:in(#mqtt_message{topic = <<"t">>, qos = 1, payload = iolist_to_binary([I])}, AccQ1) + end, Q, lists:seq(1, 255)), + ?assertEqual(510, ?Q:len(Qx)), + ?assertEqual([{len, 510}, {max_len, infinity}, {dropped, 0}], ?Q:stats(Qx)). -alarm_fun() -> - fun(_, _) -> alarm_fun() end. +alarm_fun() -> fun(_, _) -> alarm_fun() end. -endif. - diff --git a/test/emqttd_opts_tests.erl b/test/emqttd_opts_tests.erl index 50d654327..42d1bac8e 100644 --- a/test/emqttd_opts_tests.erl +++ b/test/emqttd_opts_tests.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% @Copyright (C) 2012-2015, Feng Lee +%%% @Copyright (C) 2012-2016, Feng Lee %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/test/emqttd_parser_tests.erl b/test/emqttd_parser_tests.erl index 9af6de6b0..dc85c7abd 100644 --- a/test/emqttd_parser_tests.erl +++ b/test/emqttd_parser_tests.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% @Copyright (C) 2012-2015, Feng Lee +%%% @Copyright (C) 2012-2016, Feng Lee %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/test/emqttd_router_tests.erl b/test/emqttd_router_tests.erl index 14cad08e3..14e4aa51c 100644 --- a/test/emqttd_router_tests.erl +++ b/test/emqttd_router_tests.erl @@ -1,3 +1,24 @@ +%%%----------------------------------------------------------------------------- +%%% @Copyright (C) 2012-2016, Feng Lee +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- -module(emqttd_router_tests). @@ -7,33 +28,103 @@ -include_lib("eunit/include/eunit.hrl"). --define(ROUTER, emqttd_router). +-define(R, emqttd_router). route_test_() -> - {setup, - fun() -> ?ROUTER:init([]) end, - fun(_) -> ?ROUTER:destory() end, - [?_test(t_add_routes()), + {foreach, + fun setup/0, fun teardown/1, + [?_test(t_add_route()), + ?_test(t_add_routes()), + ?_test(t_delete_route()), ?_test(t_delete_routes()), - ?_test(t_has_route()), ?_test(t_route()) ]}. +setup() -> + application:start(gproc), + ensure_tab(route, [public, named_table, duplicate_bag]), + gproc_pool:new(router, hash, [{size, 2}]), + lists:foreach(fun(I) -> + gproc_pool:add_worker(router, {router, I}, I), + {ok, R} = ?R:start_link(router, I, fun(_) -> ok end, []) + end, [1, 2]). + +ensure_tab(Tab, Opts) -> + case ets:info(Tab, name) of + undefined -> ets:new(Tab, Opts); + _ -> ok + end. + +teardown(_) -> + lists:foreach(fun(I) -> + ?R:stop(I), gproc_pool:remove_worker(router, {router, I}) + end, [1, 2]), + gproc_pool:delete(router), + ets:delete(route). + +t_add_route() -> + Self = self(), + ?R:add_route(<<"topic1">>, Self), + ?assert(?R:has_route(<<"topic1">>)), + ?R:add_route(<<"topic2">>, Self), + ?assert(?R:has_route(<<"topic2">>)), + ?assertEqual([Self], ?R:lookup_routes(<<"topic1">>)), + ?assertEqual([Self], ?R:lookup_routes(<<"topic2">>)). + t_add_routes() -> - Pid = self(), - ok. - %?ROUTER:add_routes([<<"a">>, <<"b">>], Pid), - %?assertEqual([{<<"a">>, Pid}, {<<"b">>, Pid}], lists:sort(ets:tab2list(route))), - %?assertEqual([{Pid, <<"a">>}, {Pid, <<"b">>}], lists:sort(ets:tab2list(reverse_route))). + Self = self(), + ?R:add_routes([], Self), + ?R:add_routes([<<"t0">>], Self), + ?R:add_routes([<<"t1">>,<<"t2">>,<<"t3">>], Self), + ?assert(?R:has_route(<<"t1">>)), + ?assertEqual([Self], ?R:lookup_routes(<<"t1">>)), + ?assertEqual([Self], ?R:lookup_routes(<<"t2">>)), + ?assertEqual([Self], ?R:lookup_routes(<<"t3">>)). + +t_delete_route() -> + Self = self(), + ?R:add_routes([<<"t1">>,<<"t2">>,<<"t3">>], Self), + ?assert(?R:has_route(<<"t1">>)), + ?R:delete_route(<<"t2">>, Self), + erlang:yield(), + ?assertNot(?R:has_route(<<"t2">>)), + ?assert(?R:has_route(<<"t1">>)), + ?R:delete_route(<<"t3">>, Self), + erlang:yield(), + ?assertNot(?R:has_route(<<"t3">>)). t_delete_routes() -> - ok. - -t_has_route() -> - ok. + Self = self(), + ?R:add_routes([<<"t1">>,<<"t2">>,<<"t3">>], Self), + ?R:delete_routes([<<"t3">>], Self), + erlang:yield(), %% for delete_routes is cast + ?assertNot(?R:has_route(<<"t3">>)), + ?R:delete_routes([<<"t1">>, <<"t2">>], Self), + erlang:yield(), + ?assertNot(?R:has_route(<<"t2">>)). t_route() -> - ok. + Self = self(), + Pid = spawn_link(fun() -> timer:sleep(1000) end), + ?R:add_routes([<<"$Q/1">>,<<"t/2">>,<<"t/3">>], Self), + ?R:add_routes([<<"t/2">>], Pid), + Msg1 = #mqtt_message{topic = <<"$Q/1">>, payload = <<"q">>}, + Msg2 = #mqtt_message{topic = <<"t/2">>, payload = <<"t2">>}, + Msg3 = #mqtt_message{topic = <<"t/3">>, payload = <<"t3">>}, + ?R:route(<<"$Q/1">>, Msg1), + ?R:route(<<"t/2">>, Msg2), + ?R:route(<<"t/3">>, Msg3), + ?assertEqual([Msg1, Msg2, Msg3], recv_loop([])), + ?R:add_route(<<"$Q/1">>, Pid), + ?R:route(<<"$Q/1">>, Msg1). + +recv_loop(Msgs) -> + receive + {dispatch, _Topic, Msg} -> + recv_loop([Msg|Msgs]) + after + 500 -> lists:reverse(Msgs) + end. -endif. diff --git a/test/emqttd_serializer_tests.erl b/test/emqttd_serializer_tests.erl index 02bebc6af..3a3aee519 100644 --- a/test/emqttd_serializer_tests.erl +++ b/test/emqttd_serializer_tests.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% @Copyright (C) 2012-2015, Feng Lee +%%% @Copyright (C) 2012-2016, Feng Lee %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/test/emqttd_tests.erl b/test/emqttd_tests.erl new file mode 100644 index 000000000..5f0685824 --- /dev/null +++ b/test/emqttd_tests.erl @@ -0,0 +1,33 @@ +%%%----------------------------------------------------------------------------- +%%% @Copyright (C) 2012-2016, Feng Lee +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- + +-module(emqttd_tests). + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + +seed_now_test() -> + ?assertNotEqual(emqttd:seed_now(), emqttd:seed_now()). + +-endif. + diff --git a/test/emqttd_topic_tests.erl b/test/emqttd_topic_tests.erl index 5487caf74..a48fba9c4 100644 --- a/test/emqttd_topic_tests.erl +++ b/test/emqttd_topic_tests.erl @@ -1,5 +1,5 @@ %%----------------------------------------------------------------------------- -%% Copyright (c) 2012-2015, Feng Lee +%% Copyright (c) 2012-2016, Feng Lee %% %% Permission is hereby granted, free of charge, to any person obtaining a copy %% of this software and associated documentation files (the "Software"), to deal diff --git a/test/emqttd_trie_tests.erl b/test/emqttd_trie_tests.erl index 524ca79f6..f7de97d43 100644 --- a/test/emqttd_trie_tests.erl +++ b/test/emqttd_trie_tests.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% @Copyright (C) 2012-2015, Feng Lee +%%% @Copyright (C) 2012-2016, Feng Lee %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal diff --git a/test/priority_queue_tests.erl b/test/priority_queue_tests.erl new file mode 100644 index 000000000..8e044ee4e --- /dev/null +++ b/test/priority_queue_tests.erl @@ -0,0 +1,69 @@ +%%%----------------------------------------------------------------------------- +%%% @Copyright (C) 2012-2016, Feng Lee +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- + +-module(priority_queue_tests). + +-include("emqttd.hrl"). + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + +-define(PQ, priority_queue). + +plen_test() -> + Q = ?PQ:new(), + ?assertEqual(0, ?PQ:plen(0, Q)), + Q0 = ?PQ:in(z, Q), + ?assertEqual(1, ?PQ:plen(0, Q0)), + Q1 = ?PQ:in(x, 1, Q), + ?assertEqual(1, ?PQ:plen(1, Q1)), + Q2 = ?PQ:in(y, 2, Q1), + ?assertEqual(1, ?PQ:plen(2, Q2)), + Q3 = ?PQ:in(z, 2, Q2), + ?assertEqual(2, ?PQ:plen(2, Q3)). + +out2_test() -> + Els = [a, {b, 1}, {c, 1}, {d, 2}, {e, 2}, {f, 2}], + Q = ?PQ:new(), + Q0 = lists:foldl( + fun({El, P}, Q) -> + ?PQ:in(El, P, Q); + (El, Q) -> + ?PQ:in(El, Q) + end, Q, Els), + {Val, Q1} = ?PQ:out(Q0), + ?assertEqual({value, d}, Val), + {Val1, Q2} = ?PQ:out(2, Q1), + ?assertEqual({value, e}, Val1), + {Val2, Q3} = ?PQ:out(1, Q2), + ?assertEqual({value, b}, Val2), + {Val3, Q4} = ?PQ:out(Q3), + ?assertEqual({value, f}, Val3), + {Val4, Q5} = ?PQ:out(Q4), + ?assertEqual({value, c}, Val4), + {Val5, Q6} = ?PQ:out(Q5), + ?assertEqual({value, a}, Val5), + {empty, _Q7} = ?PQ:out(Q6). + +-endif. +