diff --git a/apps/emqttd/include/emqttd_packet.hrl b/apps/emqtt/include/emqtt_packet.hrl similarity index 95% rename from apps/emqttd/include/emqttd_packet.hrl rename to apps/emqtt/include/emqtt_packet.hrl index b173b885e..408fccb60 100644 --- a/apps/emqttd/include/emqttd_packet.hrl +++ b/apps/emqtt/include/emqtt_packet.hrl @@ -24,6 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -author("feng@emqtt.io"). %%------------------------------------------------------------------------------ @@ -224,16 +225,3 @@ -define(PACKET(Type), #mqtt_packet{header = #mqtt_packet_header{type = Type}}). -%%------------------------------------------------------------------------------ -%% MQTT Message -%%------------------------------------------------------------------------------ --record(mqtt_message, { - qos = ?QOS_0 :: mqtt_qos(), - retain = false :: boolean(), - dup = false :: boolean(), - msgid :: mqtt_packet_id(), - topic :: binary(), - payload :: binary()}). - --type mqtt_message() :: #mqtt_message{}. - diff --git a/apps/emqtt/src/emqtt.app.src b/apps/emqtt/src/emqtt.app.src new file mode 100644 index 000000000..e19a3f14a --- /dev/null +++ b/apps/emqtt/src/emqtt.app.src @@ -0,0 +1,14 @@ +{application, emqtt, + [ + {description, "Erlang MQTT Common Library"}, + {vsn, "0.6.0"}, + {modules, [ + emqtt_parser + ]}, + {registered, []}, + {applications, [ + kernel, + stdlib + ]}, + {env, []} + ]}. diff --git a/apps/emqtt/src/emqtt.erl b/apps/emqtt/src/emqtt.erl new file mode 100644 index 000000000..bd733119e --- /dev/null +++ b/apps/emqtt/src/emqtt.erl @@ -0,0 +1,18 @@ +-module(emqtt). + +%% emqtt: emqtt library's entry point. + +-export([my_func/0]). + + +%% API + +my_func() -> + ok(). + +%% Internals + +ok() -> + ok. + +%% End of Module. diff --git a/apps/emqttd/src/emqttd_packet.erl b/apps/emqtt/src/emqtt_packet.erl similarity index 98% rename from apps/emqttd/src/emqttd_packet.erl rename to apps/emqtt/src/emqtt_packet.erl index b21ddb68b..d5d86cc54 100644 --- a/apps/emqttd/src/emqttd_packet.erl +++ b/apps/emqtt/src/emqtt_packet.erl @@ -20,15 +20,15 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqttd packet. +%%% emqtt packet. %%% %%% @end %%%----------------------------------------------------------------------------- --module(emqttd_packet). +-module(emqtt_packet). -author("feng@emqtt.io"). --include("emqttd_packet.hrl"). +-include("emqtt_packet.hrl"). %% API -export([protocol_name/1, type_name/1, connack_name/1]). diff --git a/apps/emqttd/src/emqttd_parser.erl b/apps/emqtt/src/emqtt_parser.erl similarity index 99% rename from apps/emqttd/src/emqttd_parser.erl rename to apps/emqtt/src/emqtt_parser.erl index 1390843e4..098942f42 100644 --- a/apps/emqttd/src/emqttd_parser.erl +++ b/apps/emqtt/src/emqtt_parser.erl @@ -20,15 +20,15 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqttd received packet parser. +%%% emqtt packet parser. %%% %%% @end %%%----------------------------------------------------------------------------- --module(emqttd_parser). +-module(emqtt_parser). -author("feng@emqtt.io"). --include("emqttd_packet.hrl"). +-include("emqtt_packet.hrl"). %% API -export([init/1, parse/2]). diff --git a/apps/emqttd/src/emqttd_serialiser.erl b/apps/emqtt/src/emqtt_serialiser.erl similarity index 98% rename from apps/emqttd/src/emqttd_serialiser.erl rename to apps/emqtt/src/emqtt_serialiser.erl index ab7d81bd2..cc4b1f0d0 100644 --- a/apps/emqttd/src/emqttd_serialiser.erl +++ b/apps/emqtt/src/emqtt_serialiser.erl @@ -20,15 +20,15 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqttd packet serialiser. +%%% emqtt packet serialiser. %%% %%% @end %%%----------------------------------------------------------------------------- --module(emqttd_serialiser). +-module(emqtt_serialiser). -author("feng@emqtt.io"). --include("emqttd_packet.hrl"). +-include("emqtt_packet.hrl"). %% API -export([serialise/1]). diff --git a/apps/emqttd/src/emqttd_topic.erl b/apps/emqtt/src/emqtt_topic.erl similarity index 89% rename from apps/emqttd/src/emqttd_topic.erl rename to apps/emqtt/src/emqtt_topic.erl index 33b541c0d..e9f4a1a05 100644 --- a/apps/emqttd/src/emqttd_topic.erl +++ b/apps/emqtt/src/emqtt_topic.erl @@ -20,19 +20,17 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqttd topic. +%%% emqtt topic. %%% %%% @end %%%----------------------------------------------------------------------------- --module(emqttd_topic). +-module(emqtt_topic). -author('feng@emqtt.io'). --include("emqttd_topic.hrl"). - -import(lists, [reverse/1]). --export([new/1, wildcard/1, match/2, validate/1, triples/1, words/1]). +-export([match/2, validate/1, triples/1, words/1, wildcard/1]). %-type type() :: static | dynamic. @@ -48,23 +46,11 @@ %%%----------------------------------------------------------------------------- %% @doc -%% New Topic +%% Is wildcard topic? %% %% @end %%%----------------------------------------------------------------------------- --spec new(binary()) -> topic(). -new(Name) when is_binary(Name) -> - #topic{name = Name, node = node()}. - -%%%----------------------------------------------------------------------------- -%% @doc -%% Is Wildcard Topic? -%% -%% @end -%%%----------------------------------------------------------------------------- --spec wildcard(topic() | binary()) -> true | false. -wildcard(#topic{name = Name}) when is_binary(Name) -> - wildcard(Name); +-spec wildcard(binary()) -> true | false. wildcard(Topic) when is_binary(Topic) -> wildcard(words(Topic)); wildcard([]) -> diff --git a/apps/emqttd/test/emqttd_parser_tests.erl b/apps/emqtt/test/emqtt_parser_tests.erl similarity index 92% rename from apps/emqttd/test/emqttd_parser_tests.erl rename to apps/emqtt/test/emqtt_parser_tests.erl index 505aa02d4..149fba546 100644 --- a/apps/emqttd/test/emqttd_parser_tests.erl +++ b/apps/emqtt/test/emqtt_parser_tests.erl @@ -20,20 +20,20 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqttd_parser tests. +%%% emqtt_parser tests. %%% %%% @end %%%----------------------------------------------------------------------------- --module(emqttd_parser_tests). +-module(emqtt_parser_tests). --include("emqttd_packet.hrl"). +-include("emqtt_packet.hrl"). -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). parse_connect_test() -> - State = emqttd_parser:init([]), + State = emqtt_parser:init([]), %% CONNECT(Qos=0, Retain=false, Dup=false, ClientId=mosqpub/10451-iMac.loca, ProtoName=MQIsdp, ProtoVsn=3, CleanSess=true, KeepAlive=60, Username=undefined, Password=undefined) V31ConnBin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,49,45,105,77,97,99,46,108,111,99,97>>, ?assertMatch({ok, #mqtt_packet{ @@ -45,7 +45,7 @@ parse_connect_test() -> proto_name = <<"MQIsdp">>, client_id = <<"mosqpub/10451-iMac.loca">>, clean_sess = true, - keep_alive = 60}}, <<>>}, emqttd_parser:parse(V31ConnBin, State)), + keep_alive = 60}}, <<>>}, emqtt_parser:parse(V31ConnBin, State)), %% CONNECT(Qos=0, Retain=false, Dup=false, ClientId=mosqpub/10451-iMac.loca, ProtoName=MQTT, ProtoVsn=4, CleanSess=true, KeepAlive=60, Username=undefined, Password=undefined) V311ConnBin = <<16,35,0,4,77,81,84,84,4,2,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,49,45,105,77,97,99,46,108,111,99,97>>, ?assertMatch({ok, #mqtt_packet{ @@ -57,7 +57,7 @@ parse_connect_test() -> proto_name = <<"MQTT">>, client_id = <<"mosqpub/10451-iMac.loca">>, clean_sess = true, - keep_alive = 60 } }, <<>>}, emqttd_parser:parse(V311ConnBin, State)), + keep_alive = 60 } }, <<>>}, emqtt_parser:parse(V311ConnBin, State)), %% CONNECT(Qos=0, Retain=false, Dup=false, ClientId="", ProtoName=MQTT, ProtoVsn=4, CleanSess=true, KeepAlive=60) V311ConnWithoutClientId = <<16,12,0,4,77,81,84,84,4,2,0,60,0,0>>, @@ -70,7 +70,7 @@ parse_connect_test() -> proto_name = <<"MQTT">>, client_id = <<>>, clean_sess = true, - keep_alive = 60 } }, <<>>}, emqttd_parser:parse(V311ConnWithoutClientId, State)), + keep_alive = 60 } }, <<>>}, emqtt_parser:parse(V311ConnWithoutClientId, State)), %%CONNECT(Qos=0, Retain=false, Dup=false, ClientId=mosqpub/10452-iMac.loca, ProtoName=MQIsdp, ProtoVsn=3, CleanSess=true, KeepAlive=60, Username=test, Password=******, Will(Qos=1, Retain=false, Topic=/will, Msg=willmsg)) ConnBinWithWill = <<16,67,0,6,77,81,73,115,100,112,3,206,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,50,45,105,77,97,99,46,108,111,99,97,0,5,47,119,105,108,108,0,7,119,105,108,108,109,115,103,0,4,116,101,115,116,0,6,112,117,98,108,105,99>>, ?assertMatch({ok, #mqtt_packet{ @@ -90,11 +90,11 @@ parse_connect_test() -> will_msg = <<"willmsg">> , username = <<"test">>, password = <<"public">>}}, - <<>> }, emqttd_parser:parse(ConnBinWithWill, State)), + <<>> }, emqtt_parser:parse(ConnBinWithWill, State)), ok. parse_publish_test() -> - State = emqttd_parser:init([]), + State = emqtt_parser:init([]), %%PUBLISH(Qos=1, Retain=false, Dup=false, TopicName=a/b/c, PacketId=1, Payload=<<"hahah">>) PubBin = <<50,14,0,5,97,47,98,47,99,0,1,104,97,104,97,104>>, ?assertMatch({ok, #mqtt_packet{ @@ -104,7 +104,7 @@ parse_publish_test() -> retain = false}, variable = #mqtt_packet_publish{topic_name = <<"a/b/c">>, packet_id = 1}, - payload = <<"hahah">> }, <<>>}, emqttd_parser:parse(PubBin, State)), + payload = <<"hahah">> }, <<>>}, emqtt_parser:parse(PubBin, State)), %PUBLISH(Qos=0, Retain=false, Dup=false, TopicName=xxx/yyy, PacketId=undefined, Payload=<<"hello">>) %DISCONNECT(Qos=0, Retain=false, Dup=false) @@ -116,13 +116,13 @@ parse_publish_test() -> retain = false}, variable = #mqtt_packet_publish{topic_name = <<"xxx/yyy">>, packet_id = undefined}, - payload = <<"hello">> }, <<224,0>>}, emqttd_parser:parse(PubBin1, State)), + payload = <<"hello">> }, <<224,0>>}, emqtt_parser:parse(PubBin1, State)), ?assertMatch({ok, #mqtt_packet{ header = #mqtt_packet_header{type = ?DISCONNECT, dup = false, qos = 0, retain = false} - }, <<>>}, emqttd_parser:parse(<<224, 0>>, State)). + }, <<>>}, emqtt_parser:parse(<<224, 0>>, State)). parse_puback_test() -> %%PUBACK(Qos=0, Retain=false, Dup=false, PacketId=1) @@ -132,7 +132,7 @@ parse_puback_test() -> dup = false, qos = 0, retain = false } - }, <<>>}, emqttd_parser:parse(PubAckBin, emqttd_parser:init([]))), + }, <<>>}, emqtt_parser:parse(PubAckBin, emqtt_parser:init([]))), ok. parse_subscribe_test() -> @@ -149,7 +149,7 @@ parse_disconnect_test() -> dup = false, qos = 0, retain = false} - }, <<>>}, emqttd_parser:parse(Bin, emqttd_parser:init([]))). + }, <<>>}, emqtt_parser:parse(Bin, emqtt_parser:init([]))). -endif. diff --git a/apps/emqttd/test/emqttd_serialiser_tests.erl b/apps/emqtt/test/emqtt_serialiser_tests.erl similarity index 67% rename from apps/emqttd/test/emqttd_serialiser_tests.erl rename to apps/emqtt/test/emqtt_serialiser_tests.erl index 3f0cb03ca..03f38833d 100644 --- a/apps/emqttd/test/emqttd_serialiser_tests.erl +++ b/apps/emqtt/test/emqtt_serialiser_tests.erl @@ -20,57 +20,57 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqttd_serialiser tests. +%%% emqtt_serialiser tests. %%% %%% @end %%%----------------------------------------------------------------------------- --module(emqttd_serialiser_tests). +-module(emqtt_serialiser_tests). --include("emqttd_packet.hrl"). +-include("emqtt_packet.hrl"). -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). serialise_connect_test() -> - emqttd_serialiser:serialise(?CONNECT_PACKET(#mqtt_packet_connect{})). + emqtt_serialiser:serialise(?CONNECT_PACKET(#mqtt_packet_connect{})). serialise_connack_test() -> ConnAck = #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}, variable = #mqtt_packet_connack{ack_flags = 0, return_code = 0}}, - ?assertEqual(<<32,2,0,0>>, emqttd_serialiser:serialise(ConnAck)). + ?assertEqual(<<32,2,0,0>>, emqtt_serialiser:serialise(ConnAck)). serialise_publish_test() -> - emqttd_serialiser:serialise(?PUBLISH_PACKET(?QOS_0, <<"Topic">>, undefined, <<"Payload">>)), - emqttd_serialiser:serialise(?PUBLISH_PACKET(?QOS_1, <<"Topic">>, 938, <<"Payload">>)). + emqtt_serialiser:serialise(?PUBLISH_PACKET(?QOS_0, <<"Topic">>, undefined, <<"Payload">>)), + emqtt_serialiser:serialise(?PUBLISH_PACKET(?QOS_1, <<"Topic">>, 938, <<"Payload">>)). serialise_puback_test() -> - emqttd_serialiser:serialise(?PUBACK_PACKET(?PUBACK, 10384)). + emqtt_serialiser:serialise(?PUBACK_PACKET(?PUBACK, 10384)). serialise_pubrel_test() -> - emqttd_serialiser:serialise(?PUBREL_PACKET(10384)). + emqtt_serialiser:serialise(?PUBREL_PACKET(10384)). serialise_subscribe_test() -> TopicTable = [{<<"TopicQos0">>, ?QOS_0}, {<<"TopicQos1">>, ?QOS_1}, {<<"TopicQos2">>, ?QOS_2}], - emqttd_serialiser:serialise(?SUBSCRIBE_PACKET(10, TopicTable)). + emqtt_serialiser:serialise(?SUBSCRIBE_PACKET(10, TopicTable)). serialise_suback_test() -> - emqttd_serialiser:serialise(?SUBACK_PACKET(10, [?QOS_0, ?QOS_1, 128])). + emqtt_serialiser:serialise(?SUBACK_PACKET(10, [?QOS_0, ?QOS_1, 128])). serialise_unsubscribe_test() -> - emqttd_serialiser:serialise(?UNSUBSCRIBE_PACKET(10, [<<"Topic1">>, <<"Topic2">>])). + emqtt_serialiser:serialise(?UNSUBSCRIBE_PACKET(10, [<<"Topic1">>, <<"Topic2">>])). serialise_unsuback_test() -> - emqttd_serialiser:serialise(?UNSUBACK_PACKET(10)). + emqtt_serialiser:serialise(?UNSUBACK_PACKET(10)). serialise_pingreq_test() -> - emqttd_serialiser:serialise(?PACKET(?PINGREQ)). + emqtt_serialiser:serialise(?PACKET(?PINGREQ)). serialise_pingresp_test() -> - emqttd_serialiser:serialise(?PACKET(?PINGRESP)). + emqtt_serialiser:serialise(?PACKET(?PINGRESP)). serialise_disconnect_test() -> - emqttd_serialiser:serialise(?PACKET(?DISCONNECT)). + emqtt_serialiser:serialise(?PACKET(?DISCONNECT)). -endif. diff --git a/apps/emqttd/test/emqttd_topic_tests.erl b/apps/emqtt/test/emqtt_topic_tests.erl similarity index 93% rename from apps/emqttd/test/emqttd_topic_tests.erl rename to apps/emqtt/test/emqtt_topic_tests.erl index a911f9b58..029015216 100644 --- a/apps/emqttd/test/emqttd_topic_tests.erl +++ b/apps/emqtt/test/emqtt_topic_tests.erl @@ -19,11 +19,9 @@ %% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %% SOFTWARE. %%------------------------------------------------------------------------------ --module(emqttd_topic_tests). +-module(emqtt_topic_tests). --include("emqttd_topic.hrl"). - --import(emqttd_topic, [validate/1, wildcard/1, match/2, triples/1, words/1]). +-import(emqtt_topic, [validate/1, wildcard/1, match/2, triples/1, words/1]). -ifdef(TEST). @@ -101,9 +99,9 @@ triples_perf_test() -> ok. type_test() -> - ?assertEqual(false, wildcard(#topic{name = <<"/a/b/cdkd">>})), - ?assertEqual(true, wildcard(#topic{name = <<"/a/+/d">>})), - ?assertEqual(true, wildcard(#topic{name = <<"/a/b/#">>})). + ?assertEqual(false, wildcard(<<"/a/b/cdkd">>)), + ?assertEqual(true, wildcard(<<"/a/+/d">>)), + ?assertEqual(true, wildcard(<<"/a/b/#">>)). words_test() -> ?assertMatch(['', <<"abkc">>, <<"19383">>, '+', <<"akakdkkdkak">>, '#'], words(<<"/abkc/19383/+/akakdkkdkak/#">>)), diff --git a/apps/emqttd/include/emqttd.hrl b/apps/emqttd/include/emqttd.hrl index 45d2a7330..c96f59d5f 100644 --- a/apps/emqttd/include/emqttd.hrl +++ b/apps/emqttd/include/emqttd.hrl @@ -20,7 +20,7 @@ %% SOFTWARE. %%------------------------------------------------------------------------------ %%% @doc -%%% emqtt header. +%%% emqttd header. %%% %%% @end %%%----------------------------------------------------------------------------- @@ -37,16 +37,38 @@ -define(ERTS_MINIMUM, "6.0"). %%------------------------------------------------------------------------------ -%% PubSub Type +%% PubSub %%------------------------------------------------------------------------------ -type pubsub() :: publish | subscribe. +%%------------------------------------------------------------------------------ +%% MQTT Topic +%%------------------------------------------------------------------------------ +-record(mqtt_topic, { + topic :: binary(), + node :: node() +}). + +-type mqtt_topic() :: #mqtt_topic{}. + +%%------------------------------------------------------------------------------ +%% MQTT Topic Subscriber +%%------------------------------------------------------------------------------ +-record(mqtt_subscriber, { + topic :: binary(), + qos = 0 :: 0 | 1 | 2, + subpid :: pid() +}). + +-type mqtt_subscriber() :: #mqtt_subscriber{}. + %%------------------------------------------------------------------------------ %% MQTT Client %%------------------------------------------------------------------------------ -record(mqtt_client, { - client_id, - username + clientid :: binary(), + username :: binary() | undefined, + ipaddr :: inet:ip_address() }). -type mqtt_client() :: #mqtt_client{}. @@ -55,7 +77,7 @@ %% MQTT Session %%------------------------------------------------------------------------------ -record(mqtt_session, { - client_id, + clientid, session_pid, subscriptions = [], awaiting_ack, @@ -65,24 +87,21 @@ -type mqtt_session() :: #mqtt_session{}. %%------------------------------------------------------------------------------ -%% MQTT User Management -%%------------------------------------------------------------------------------ --record(mqtt_user, { - username :: binary(), - ipaddr :: inet:ip_address(), - clientid :: binary() -}). - --type mqtt_user() :: #mqtt_user{}. - -%%------------------------------------------------------------------------------ -%% MQTT Authorization +%% MQTT Message %%------------------------------------------------------------------------------ -%% {subscribe, From, Topic} -%% {publish, From, Topic} +-type mqtt_msgid() :: undefined | 1..16#ffff. -%%TODO: ClientId | Username --> Pub | Sub --> Topics +-record(mqtt_message, { + %% topic is first for message may be retained + topic :: binary(), + qos = 0 :: 0 | 1 | 2, + retain = false :: boolean(), + dup = false :: boolean(), + msgid :: mqtt_msgid(), + payload :: binary()}). + +-type mqtt_message() :: #mqtt_message{}. %%------------------------------------------------------------------------------ %% MQTT Plugin @@ -90,11 +109,3 @@ -record(mqtt_plugin, {name, version, attrs, description}). -%%------------------------------------------------------------------------------ -%% MQTT Retained Message -%%------------------------------------------------------------------------------ --record(message_retained, {topic, qos, payload}). - --type message_retained() :: #message_retained{}. - - diff --git a/apps/emqttd/src/emqttd_access_rule.erl b/apps/emqttd/src/emqttd_access_rule.erl index 7d367f68f..ac226a54e 100644 --- a/apps/emqttd/src/emqttd_access_rule.erl +++ b/apps/emqttd/src/emqttd_access_rule.erl @@ -73,7 +73,7 @@ compile(who, {user, Username}) -> {user, bin(Username)}; compile(topic, Topic) -> - Words = emqttd_topic:words(Topic), + Words = emqtt_topic:words(Topic), case 'pattern?'(Words) of true -> {pattern, Words}; false -> Words @@ -126,18 +126,18 @@ match_topics(_User, _Topic, []) -> false; match_topics(User, Topic, [{pattern, PatternFilter}|Filters]) -> TopicFilter = feed_var(User, PatternFilter), - case match_topic(emqttd_topic:words(Topic), TopicFilter) of + case match_topic(emqtt_topic:words(Topic), TopicFilter) of true -> true; false -> match_topics(User, Topic, Filters) end; match_topics(User, Topic, [TopicFilter|Filters]) -> - case match_topic(emqttd_topic:words(Topic), TopicFilter) of + case match_topic(emqtt_topic:words(Topic), TopicFilter) of true -> true; false -> match_topics(User, Topic, Filters) end. match_topic(Topic, TopicFilter) -> - emqttd_topic:match(Topic, TopicFilter). + emqtt_topic:match(Topic, TopicFilter). feed_var(User, Pattern) -> feed_var(User, Pattern, []). diff --git a/apps/emqttd/src/emqttd_acl.erl b/apps/emqttd/src/emqttd_acl.erl index 1184d87ce..399f3d9d2 100644 --- a/apps/emqttd/src/emqttd_acl.erl +++ b/apps/emqttd/src/emqttd_acl.erl @@ -53,8 +53,8 @@ -callback init(AclOpts :: list()) -> {ok, State :: any()}. --callback check_acl({User, PubSub, Topic}, State :: any()) -> allow | deny | ignore when - User :: mqtt_user(), +-callback check_acl({Client, PubSub, Topic}, State :: any()) -> allow | deny | ignore when + Client :: mqtt_client(), PubSub :: pubsub(), Topic :: binary(). diff --git a/apps/emqttd/src/emqttd_auth.erl b/apps/emqttd/src/emqttd_auth.erl index 8c3a95eae..3a44c49fc 100644 --- a/apps/emqttd/src/emqttd_auth.erl +++ b/apps/emqttd/src/emqttd_auth.erl @@ -46,10 +46,10 @@ -ifdef(use_specs). --callback check(User, Password, State) -> ok | ignore | {error, string()} when - User :: mqtt_user(), - Password :: binary(), - State :: any(). +-callback check(Client, Password, State) -> ok | ignore | {error, string()} when + Client :: mqtt_client(), + Password :: binary(), + State :: any(). -callback description() -> string(). @@ -68,18 +68,18 @@ behaviour_info(_Other) -> start_link(AuthMods) -> gen_server:start_link({local, ?MODULE}, ?MODULE, [AuthMods], []). --spec login(mqtt_user(), undefined | binary()) -> ok | {error, string()}. -login(User, Password) when is_record(User, mqtt_user) -> +-spec login(mqtt_client(), undefined | binary()) -> ok | {error, string()}. +login(Client, Password) when is_record(Client, mqtt_client) -> [{_, AuthMods}] = ets:lookup(?AUTH_TABLE, auth_modules), - check(User, Password, AuthMods). + check(Client, Password, AuthMods). -check(_User, _Password, []) -> +check(_Client, _Password, []) -> {error, "No auth module to check!"}; -check(User, Password, [{Mod, State} | Mods]) -> - case Mod:check(User, Password, State) of +check(Client, Password, [{Mod, State} | Mods]) -> + case Mod:check(Client, Password, State) of ok -> ok; {error, Reason} -> {error, Reason}; - ignore -> check(User, Password, Mods) + ignore -> check(Client, Password, Mods) end. add_module(Mod, Opts) -> diff --git a/apps/emqttd/src/emqttd_bridge.erl b/apps/emqttd/src/emqttd_bridge.erl index 765f988e1..946928e30 100644 --- a/apps/emqttd/src/emqttd_bridge.erl +++ b/apps/emqttd/src/emqttd_bridge.erl @@ -30,7 +30,7 @@ -behaviour(gen_server). --include("emqttd_packet.hrl"). +-include_lib("emqtt/include/emqtt_packet.hrl"). %% API Function Exports -export([start_link/3]). diff --git a/apps/emqttd/src/emqttd_broker.erl b/apps/emqttd/src/emqttd_broker.erl index 8bca804ba..6c7823e5c 100644 --- a/apps/emqttd/src/emqttd_broker.erl +++ b/apps/emqttd/src/emqttd_broker.erl @@ -26,7 +26,7 @@ %%%----------------------------------------------------------------------------- -module(emqttd_broker). --include("emqttd_packet.hrl"). +-include("emqttd.hrl"). -include("emqttd_systop.hrl"). diff --git a/apps/emqttd/src/emqttd_client.erl b/apps/emqttd/src/emqttd_client.erl index 30d1ed41a..b1f6c837e 100644 --- a/apps/emqttd/src/emqttd_client.erl +++ b/apps/emqttd/src/emqttd_client.erl @@ -41,7 +41,7 @@ -include("emqttd.hrl"). --include("emqttd_packet.hrl"). +-include_lib("emqtt/include/emqtt_packet.hrl"). %%Client State... -record(state, {transport, diff --git a/apps/emqttd/src/emqttd_event.erl b/apps/emqttd/src/emqttd_event.erl index 730456633..10a76446c 100644 --- a/apps/emqttd/src/emqttd_event.erl +++ b/apps/emqttd/src/emqttd_event.erl @@ -26,7 +26,7 @@ %%%----------------------------------------------------------------------------- -module(emqttd_event). --include("emqttd_packet.hrl"). +-include("emqttd.hrl"). %% API Function Exports -export([start_link/0, diff --git a/apps/emqttd/src/emqttd_http.erl b/apps/emqttd/src/emqttd_http.erl index aea237f5f..f4da0d8b3 100644 --- a/apps/emqttd/src/emqttd_http.erl +++ b/apps/emqttd/src/emqttd_http.erl @@ -30,8 +30,6 @@ -include("emqttd.hrl"). --include("emqttd_packet.hrl"). - -import(proplists, [get_value/2, get_value/3]). -export([handle/1]). @@ -87,7 +85,7 @@ validate(qos, Qos) -> (Qos >= ?QOS_0) and (Qos =< ?QOS_2); validate(topic, Topic) -> - emqttd_topic:validate({name, Topic}). + emqtt_topic:validate({name, Topic}). int(S) -> list_to_integer(S). diff --git a/apps/emqttd/src/emqttd_message.erl b/apps/emqttd/src/emqttd_message.erl index 0e17bbe26..5f84ef77b 100644 --- a/apps/emqttd/src/emqttd_message.erl +++ b/apps/emqttd/src/emqttd_message.erl @@ -30,7 +30,7 @@ -include("emqttd.hrl"). --include("emqttd_packet.hrl"). +-include_lib("emqtt/include/emqtt_packet.hrl"). -export([from_packet/1, to_packet/1]). diff --git a/apps/emqttd/src/emqttd_metrics.erl b/apps/emqttd/src/emqttd_metrics.erl index 09758a6a4..ffb019374 100644 --- a/apps/emqttd/src/emqttd_metrics.erl +++ b/apps/emqttd/src/emqttd_metrics.erl @@ -28,7 +28,7 @@ -author('feng@emqtt.io'). --include("emqttd_packet.hrl"). +-include("emqttd.hrl"). -include("emqttd_systop.hrl"). diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index c70e27d77..4cf33dd12 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -28,7 +28,7 @@ -include("emqttd.hrl"). --include("emqttd_packet.hrl"). +-include_lib("emqtt/include/emqtt_packet.hrl"). %% API -export([init/2, client_id/1]). @@ -93,7 +93,7 @@ received(_Packet, State = #proto_state{connected = false}) -> received(Packet = ?PACKET(_Type), State = #proto_state{peername = Peername, client_id = ClientId}) -> - lager:debug("RECV from ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqttd_packet:dump(Packet)]), + lager:debug("RECV from ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqtt_packet:dump(Packet)]), case validate_packet(Packet) of ok -> handle(Packet, State); @@ -110,7 +110,7 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername = keep_alive = KeepAlive, client_id = ClientId} = Var, - lager:debug("RECV from ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqttd_packet:dump(Packet)]), + lager:debug("RECV from ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqtt_packet:dump(Packet)]), State1 = State#proto_state{proto_ver = ProtoVer, username = Username, @@ -226,7 +226,7 @@ send({_From, Message = #mqtt_message{qos = Qos}}, State = #proto_state{session = send(emqttd_message:to_packet(Message1), State#proto_state{session = NewSession}); send(Packet, State = #proto_state{transport = Transport, socket = Sock, peername = Peername, client_id = ClientId}) when is_record(Packet, mqtt_packet) -> - lager:debug("SENT to ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqttd_packet:dump(Packet)]), + lager:debug("SENT to ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqtt_packet:dump(Packet)]), sent_stats(Packet), Data = emqttd_serialiser:serialise(Packet), lager:debug("SENT to ~s: ~p", [emqttd_net:format(Peername), Data]), @@ -297,7 +297,7 @@ validate_clientid(#mqtt_packet_connect {proto_ver = Ver, clean_sess = CleanSess, validate_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH}, variable = #mqtt_packet_publish{topic_name = Topic}}) -> - case emqttd_topic:validate({name, Topic}) of + case emqtt_topic:validate({name, Topic}) of true -> ok; false -> lager:warning("Error publish topic: ~p", [Topic]), {error, badtopic} end; @@ -321,7 +321,7 @@ validate_topics(Type, []) when Type =:= name orelse Type =:= filter -> validate_topics(Type, Topics) when Type =:= name orelse Type =:= filter -> ErrTopics = [Topic || {Topic, Qos} <- Topics, - not (emqttd_topic:validate({Type, Topic}) and validate_qos(Qos))], + not (emqtt_topic:validate({Type, Topic}) and validate_qos(Qos))], case ErrTopics of [] -> ok; _ -> lager:error("Error Topics: ~p", [ErrTopics]), {error, badtopic} diff --git a/apps/emqttd/src/emqttd_pubsub.erl b/apps/emqttd/src/emqttd_pubsub.erl index 40bd5e509..082800939 100644 --- a/apps/emqttd/src/emqttd_pubsub.erl +++ b/apps/emqttd/src/emqttd_pubsub.erl @@ -30,9 +30,7 @@ -include("emqttd.hrl"). --include("emqttd_topic.hrl"). - --include("emqttd_packet.hrl"). +-include_lib("emqtt/include/emqtt_packet.hrl"). -behaviour(gen_server). @@ -68,20 +66,20 @@ mnesia(create) -> ok = emqttd_mnesia:create_table(topic, [ {type, bag}, {ram_copies, [node()]}, - {record_name, topic}, - {attributes, record_info(fields, topic)}]), + {record_name, mqtt_topic}, + {attributes, record_info(fields, mqtt_topic)}]), %% local subscriber table, not shared with other nodes - ok = emqttd_mnesia:create_table(topic_subscriber, [ + ok = emqttd_mnesia:create_table(subscriber, [ {type, bag}, {ram_copies, [node()]}, - {record_name, topic_subscriber}, - {attributes, record_info(fields, topic_subscriber)}, + {record_name, mqtt_subscriber}, + {attributes, record_info(fields, mqtt_subscriber)}, {index, [subpid]}, {local_content, true}]); mnesia(replicate) -> ok = emqttd_mnesia:copy_table(topic), - ok = emqttd_mnesia:copy_table(topic_subscriber). + ok = emqttd_mnesia:copy_table(subscriber). %%%============================================================================= %%% API @@ -105,7 +103,8 @@ start_link() -> %%------------------------------------------------------------------------------ -spec create(binary()) -> ok. create(Topic) when is_binary(Topic) -> - {atomic, ok} = mnesia:transaction(fun insert_topic/1, [emqttd_topic:new(Topic)]), ok. + Record = #mqtt_topic{topic = Topic, node = node()}, + {atomic, ok} = mnesia:transaction(fun insert_topic/1, [Record]), ok. %%------------------------------------------------------------------------------ %% @doc @@ -128,7 +127,7 @@ subscribe(Topics = [{_Topic, _Qos}|_]) -> -spec subscribe(Topic :: binary(), Qos :: mqtt_qos()) -> {ok, Qos :: mqtt_qos()}. subscribe(Topic, Qos) when is_binary(Topic) andalso ?IS_QOS(Qos) -> - TopicRecord = emqttd_topic:new(Topic), + TopicRecord = #mqtt_topic{topic = Topic, node = node()}, Subscriber = #topic_subscriber{topic = Topic, qos = Qos, subpid = self()}, F = fun() -> case insert_topic(TopicRecord) of @@ -150,9 +149,10 @@ subscribe(Topic, Qos) when is_binary(Topic) andalso ?IS_QOS(Qos) -> -spec unsubscribe(binary() | list(binary())) -> ok. unsubscribe(Topic) when is_binary(Topic) -> SubPid = self(), - TopicRecord = emqttd_topic:new(Topic), + TopicRecord = #mqtt_topic{topic = Topic, node = node()}, F = fun() -> - Pattern = #topic_subscriber{topic = Topic, _ = '_', subpid = SubPid}, + %%TODO record name... + Pattern = #mqtt_subscriber{topic = Topic, _ = '_', subpid = SubPid}, [mnesia:delete_object(Sub) || Sub <- mnesia:match_object(Pattern)], try_remove_topic(TopicRecord) end, @@ -173,7 +173,7 @@ publish(Msg=#mqtt_message{topic=Topic}) -> -spec publish(Topic :: binary(), Msg :: mqtt_message()) -> any(). publish(Topic, Msg) when is_binary(Topic) -> - lists:foreach(fun(#topic{name=Name, node=Node}) -> + lists:foreach(fun(#mqtt_topic{topic=Name, node=Node}) -> case Node =:= node() of true -> dispatch(Name, Msg); false -> rpc:cast(Node, ?MODULE, dispatch, [Name, Msg]) @@ -194,7 +194,7 @@ dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) -> setstats(dropped); Subscribers -> lists:foreach( - fun(#topic_subscriber{qos = SubQos, subpid=SubPid}) -> + fun(#mqtt_subscriber{qos = SubQos, subpid=SubPid}) -> Msg1 = if Qos > SubQos -> Msg#mqtt_message{qos = SubQos}; true -> Msg @@ -210,7 +210,7 @@ dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) -> %% %% @end %%------------------------------------------------------------------------------ --spec match(Topic :: binary()) -> [topic()]. +-spec match(Topic :: binary()) -> [mqtt_topic()]. match(Topic) when is_binary(Topic) -> MatchedTopics = mnesia:async_dirty(fun emqttd_trie:find/1, [Topic]), lists:flatten([mnesia:dirty_read(topic, Name) || Name <- MatchedTopics]). @@ -235,7 +235,7 @@ handle_cast(Msg, State) -> lager:error("Bad Msg: ~p", [Msg]), {noreply, State}. -handle_info({mnesia_table_event, {write, #topic_subscriber{subpid = Pid}, _ActivityId}}, +handle_info({mnesia_table_event, {write, #mqtt_subscriber{subpid = Pid}, _ActivityId}}, State = #state{submap = SubMap}) -> NewSubMap = case maps:is_key(Pid, SubMap) of @@ -247,7 +247,7 @@ handle_info({mnesia_table_event, {write, #topic_subscriber{subpid = Pid}, _Activ setstats(subscribers), {noreply, State#state{submap = NewSubMap}}; -handle_info({mnesia_table_event, {write, #topic{}, _ActivityId}}, State) -> +handle_info({mnesia_table_event, {write, #mqtt_topic{}, _ActivityId}}, State) -> %%TODO: this is not right when clusterd. setstats(topics), {noreply, State}; diff --git a/apps/emqttd/src/emqttd_queue.erl b/apps/emqttd/src/emqttd_queue.erl index 0f825dd81..c2c68363d 100644 --- a/apps/emqttd/src/emqttd_queue.erl +++ b/apps/emqttd/src/emqttd_queue.erl @@ -29,7 +29,7 @@ -module(emqttd_queue). --include("emqttd_packet.hrl"). +-include("emqttd.hrl"). -export([new/1, new/2, in/3, all/1, clear/1]). @@ -66,7 +66,7 @@ in(ClientId, Message = #mqtt_message{qos = Qos}, Wrapper#mqtt_queue_wrapper{queue = queue:in(Message, Queue)}; false -> % full if - Qos =:= ?QOS_0 -> + Qos =:= 0 -> lager:warning("Queue ~s drop qos0 message: ~p", [ClientId, Message]), Wrapper; true -> diff --git a/apps/emqttd/src/emqttd_retained.erl b/apps/emqttd/src/emqttd_retained.erl index 748b051d7..8bf03e915 100644 --- a/apps/emqttd/src/emqttd_retained.erl +++ b/apps/emqttd/src/emqttd_retained.erl @@ -30,8 +30,6 @@ -include("emqttd.hrl"). --include("emqttd_packet.hrl"). - -define(RETAINED_TABLE, message_retained). %% API Function Exports @@ -83,12 +81,12 @@ env() -> CPid :: pid(). redeliver(Topics, CPid) when is_pid(CPid) -> lists:foreach(fun(Topic) -> - case emqttd_topic:wildcard(Topic) of + case emqtt_topic:wildcard(Topic) of false -> dispatch(CPid, mnesia:dirty_read(message_retained, Topic)); true -> Fun = fun(Msg = #message_retained{topic = Name}, Acc) -> - case emqttd_topic:match(Name, Topic) of + case emqtt_topic:match(Name, Topic) of true -> [Msg|Acc]; false -> Acc end diff --git a/apps/emqttd/src/emqttd_router.erl b/apps/emqttd/src/emqttd_router.erl index 5006e8f59..48a3886b5 100644 --- a/apps/emqttd/src/emqttd_router.erl +++ b/apps/emqttd/src/emqttd_router.erl @@ -23,7 +23,7 @@ %%route chain... statistics -module(emqttd_router). --include("emqttd_packet.hrl"). +-include("emqttd.hrl"). -behaviour(gen_server). diff --git a/apps/emqttd/src/emqttd_session.erl b/apps/emqttd/src/emqttd_session.erl index 30967e6f2..757f96a21 100644 --- a/apps/emqttd/src/emqttd_session.erl +++ b/apps/emqttd/src/emqttd_session.erl @@ -28,8 +28,6 @@ -include("emqttd.hrl"). --include("emqttd_packet.hrl"). - %% API Function Exports -export([start/1, resume/3, diff --git a/apps/emqttd/src/emqttd_trie.erl b/apps/emqttd/src/emqttd_trie.erl index ee80d6f99..6938bb1a5 100644 --- a/apps/emqttd/src/emqttd_trie.erl +++ b/apps/emqttd/src/emqttd_trie.erl @@ -108,7 +108,7 @@ insert(Topic) when is_binary(Topic) -> mnesia:write(TrieNode#trie_node{topic=Topic}); [] -> %add trie path - [add_path(Triple) || Triple <- emqttd_topic:triples(Topic)], + [add_path(Triple) || Triple <- emqtt_topic:triples(Topic)], %add last node mnesia:write(#trie_node{node_id=Topic, topic=Topic}) end. @@ -121,7 +121,7 @@ insert(Topic) when is_binary(Topic) -> %%------------------------------------------------------------------------------ -spec find(Topic :: binary()) -> list(MatchedTopic :: binary()). find(Topic) when is_binary(Topic) -> - TrieNodes = match_node(root, emqttd_topic:words(Topic), []), + TrieNodes = match_node(root, emqtt_topic:words(Topic), []), [Name || #trie_node{topic=Name} <- TrieNodes, Name=/= undefined]. %%------------------------------------------------------------------------------ @@ -135,7 +135,7 @@ delete(Topic) when is_binary(Topic) -> case mnesia:read(trie_node, Topic) of [#trie_node{edge_count=0}] -> mnesia:delete({trie_node, Topic}), - delete_path(lists:reverse(emqttd_topic:triples(Topic))); + delete_path(lists:reverse(emqtt_topic:triples(Topic))); [TrieNode] -> mnesia:write(TrieNode#trie_node{topic=Topic}); [] ->