emqtt app
This commit is contained in:
parent
2ce7683da0
commit
c54fb8c0ba
|
@ -24,6 +24,7 @@
|
||||||
%%%
|
%%%
|
||||||
%%% @end
|
%%% @end
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
|
|
||||||
-author("feng@emqtt.io").
|
-author("feng@emqtt.io").
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -224,16 +225,3 @@
|
||||||
-define(PACKET(Type),
|
-define(PACKET(Type),
|
||||||
#mqtt_packet{header = #mqtt_packet_header{type = Type}}).
|
#mqtt_packet{header = #mqtt_packet_header{type = Type}}).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
%% MQTT Message
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
-record(mqtt_message, {
|
|
||||||
qos = ?QOS_0 :: mqtt_qos(),
|
|
||||||
retain = false :: boolean(),
|
|
||||||
dup = false :: boolean(),
|
|
||||||
msgid :: mqtt_packet_id(),
|
|
||||||
topic :: binary(),
|
|
||||||
payload :: binary()}).
|
|
||||||
|
|
||||||
-type mqtt_message() :: #mqtt_message{}.
|
|
||||||
|
|
|
@ -0,0 +1,14 @@
|
||||||
|
{application, emqtt,
|
||||||
|
[
|
||||||
|
{description, "Erlang MQTT Common Library"},
|
||||||
|
{vsn, "0.6.0"},
|
||||||
|
{modules, [
|
||||||
|
emqtt_parser
|
||||||
|
]},
|
||||||
|
{registered, []},
|
||||||
|
{applications, [
|
||||||
|
kernel,
|
||||||
|
stdlib
|
||||||
|
]},
|
||||||
|
{env, []}
|
||||||
|
]}.
|
|
@ -0,0 +1,18 @@
|
||||||
|
-module(emqtt).
|
||||||
|
|
||||||
|
%% emqtt: emqtt library's entry point.
|
||||||
|
|
||||||
|
-export([my_func/0]).
|
||||||
|
|
||||||
|
|
||||||
|
%% API
|
||||||
|
|
||||||
|
my_func() ->
|
||||||
|
ok().
|
||||||
|
|
||||||
|
%% Internals
|
||||||
|
|
||||||
|
ok() ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%% End of Module.
|
|
@ -20,15 +20,15 @@
|
||||||
%%% SOFTWARE.
|
%%% SOFTWARE.
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
%%% @doc
|
%%% @doc
|
||||||
%%% emqttd packet.
|
%%% emqtt packet.
|
||||||
%%%
|
%%%
|
||||||
%%% @end
|
%%% @end
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
-module(emqttd_packet).
|
-module(emqtt_packet).
|
||||||
|
|
||||||
-author("feng@emqtt.io").
|
-author("feng@emqtt.io").
|
||||||
|
|
||||||
-include("emqttd_packet.hrl").
|
-include("emqtt_packet.hrl").
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([protocol_name/1, type_name/1, connack_name/1]).
|
-export([protocol_name/1, type_name/1, connack_name/1]).
|
|
@ -20,15 +20,15 @@
|
||||||
%%% SOFTWARE.
|
%%% SOFTWARE.
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
%%% @doc
|
%%% @doc
|
||||||
%%% emqttd received packet parser.
|
%%% emqtt packet parser.
|
||||||
%%%
|
%%%
|
||||||
%%% @end
|
%%% @end
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
-module(emqttd_parser).
|
-module(emqtt_parser).
|
||||||
|
|
||||||
-author("feng@emqtt.io").
|
-author("feng@emqtt.io").
|
||||||
|
|
||||||
-include("emqttd_packet.hrl").
|
-include("emqtt_packet.hrl").
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([init/1, parse/2]).
|
-export([init/1, parse/2]).
|
|
@ -20,15 +20,15 @@
|
||||||
%%% SOFTWARE.
|
%%% SOFTWARE.
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
%%% @doc
|
%%% @doc
|
||||||
%%% emqttd packet serialiser.
|
%%% emqtt packet serialiser.
|
||||||
%%%
|
%%%
|
||||||
%%% @end
|
%%% @end
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
-module(emqttd_serialiser).
|
-module(emqtt_serialiser).
|
||||||
|
|
||||||
-author("feng@emqtt.io").
|
-author("feng@emqtt.io").
|
||||||
|
|
||||||
-include("emqttd_packet.hrl").
|
-include("emqtt_packet.hrl").
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([serialise/1]).
|
-export([serialise/1]).
|
|
@ -20,19 +20,17 @@
|
||||||
%%% SOFTWARE.
|
%%% SOFTWARE.
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
%%% @doc
|
%%% @doc
|
||||||
%%% emqttd topic.
|
%%% emqtt topic.
|
||||||
%%%
|
%%%
|
||||||
%%% @end
|
%%% @end
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
-module(emqttd_topic).
|
-module(emqtt_topic).
|
||||||
|
|
||||||
-author('feng@emqtt.io').
|
-author('feng@emqtt.io').
|
||||||
|
|
||||||
-include("emqttd_topic.hrl").
|
|
||||||
|
|
||||||
-import(lists, [reverse/1]).
|
-import(lists, [reverse/1]).
|
||||||
|
|
||||||
-export([new/1, wildcard/1, match/2, validate/1, triples/1, words/1]).
|
-export([match/2, validate/1, triples/1, words/1, wildcard/1]).
|
||||||
|
|
||||||
%-type type() :: static | dynamic.
|
%-type type() :: static | dynamic.
|
||||||
|
|
||||||
|
@ -48,23 +46,11 @@
|
||||||
|
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
%% @doc
|
%% @doc
|
||||||
%% New Topic
|
%% Is wildcard topic?
|
||||||
%%
|
%%
|
||||||
%% @end
|
%% @end
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
-spec new(binary()) -> topic().
|
-spec wildcard(binary()) -> true | false.
|
||||||
new(Name) when is_binary(Name) ->
|
|
||||||
#topic{name = Name, node = node()}.
|
|
||||||
|
|
||||||
%%%-----------------------------------------------------------------------------
|
|
||||||
%% @doc
|
|
||||||
%% Is Wildcard Topic?
|
|
||||||
%%
|
|
||||||
%% @end
|
|
||||||
%%%-----------------------------------------------------------------------------
|
|
||||||
-spec wildcard(topic() | binary()) -> true | false.
|
|
||||||
wildcard(#topic{name = Name}) when is_binary(Name) ->
|
|
||||||
wildcard(Name);
|
|
||||||
wildcard(Topic) when is_binary(Topic) ->
|
wildcard(Topic) when is_binary(Topic) ->
|
||||||
wildcard(words(Topic));
|
wildcard(words(Topic));
|
||||||
wildcard([]) ->
|
wildcard([]) ->
|
|
@ -20,20 +20,20 @@
|
||||||
%%% SOFTWARE.
|
%%% SOFTWARE.
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
%%% @doc
|
%%% @doc
|
||||||
%%% emqttd_parser tests.
|
%%% emqtt_parser tests.
|
||||||
%%%
|
%%%
|
||||||
%%% @end
|
%%% @end
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
-module(emqttd_parser_tests).
|
-module(emqtt_parser_tests).
|
||||||
|
|
||||||
-include("emqttd_packet.hrl").
|
-include("emqtt_packet.hrl").
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
parse_connect_test() ->
|
parse_connect_test() ->
|
||||||
State = emqttd_parser:init([]),
|
State = emqtt_parser:init([]),
|
||||||
%% CONNECT(Qos=0, Retain=false, Dup=false, ClientId=mosqpub/10451-iMac.loca, ProtoName=MQIsdp, ProtoVsn=3, CleanSess=true, KeepAlive=60, Username=undefined, Password=undefined)
|
%% CONNECT(Qos=0, Retain=false, Dup=false, ClientId=mosqpub/10451-iMac.loca, ProtoName=MQIsdp, ProtoVsn=3, CleanSess=true, KeepAlive=60, Username=undefined, Password=undefined)
|
||||||
V31ConnBin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,49,45,105,77,97,99,46,108,111,99,97>>,
|
V31ConnBin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,49,45,105,77,97,99,46,108,111,99,97>>,
|
||||||
?assertMatch({ok, #mqtt_packet{
|
?assertMatch({ok, #mqtt_packet{
|
||||||
|
@ -45,7 +45,7 @@ parse_connect_test() ->
|
||||||
proto_name = <<"MQIsdp">>,
|
proto_name = <<"MQIsdp">>,
|
||||||
client_id = <<"mosqpub/10451-iMac.loca">>,
|
client_id = <<"mosqpub/10451-iMac.loca">>,
|
||||||
clean_sess = true,
|
clean_sess = true,
|
||||||
keep_alive = 60}}, <<>>}, emqttd_parser:parse(V31ConnBin, State)),
|
keep_alive = 60}}, <<>>}, emqtt_parser:parse(V31ConnBin, State)),
|
||||||
%% CONNECT(Qos=0, Retain=false, Dup=false, ClientId=mosqpub/10451-iMac.loca, ProtoName=MQTT, ProtoVsn=4, CleanSess=true, KeepAlive=60, Username=undefined, Password=undefined)
|
%% CONNECT(Qos=0, Retain=false, Dup=false, ClientId=mosqpub/10451-iMac.loca, ProtoName=MQTT, ProtoVsn=4, CleanSess=true, KeepAlive=60, Username=undefined, Password=undefined)
|
||||||
V311ConnBin = <<16,35,0,4,77,81,84,84,4,2,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,49,45,105,77,97,99,46,108,111,99,97>>,
|
V311ConnBin = <<16,35,0,4,77,81,84,84,4,2,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,49,45,105,77,97,99,46,108,111,99,97>>,
|
||||||
?assertMatch({ok, #mqtt_packet{
|
?assertMatch({ok, #mqtt_packet{
|
||||||
|
@ -57,7 +57,7 @@ parse_connect_test() ->
|
||||||
proto_name = <<"MQTT">>,
|
proto_name = <<"MQTT">>,
|
||||||
client_id = <<"mosqpub/10451-iMac.loca">>,
|
client_id = <<"mosqpub/10451-iMac.loca">>,
|
||||||
clean_sess = true,
|
clean_sess = true,
|
||||||
keep_alive = 60 } }, <<>>}, emqttd_parser:parse(V311ConnBin, State)),
|
keep_alive = 60 } }, <<>>}, emqtt_parser:parse(V311ConnBin, State)),
|
||||||
|
|
||||||
%% CONNECT(Qos=0, Retain=false, Dup=false, ClientId="", ProtoName=MQTT, ProtoVsn=4, CleanSess=true, KeepAlive=60)
|
%% CONNECT(Qos=0, Retain=false, Dup=false, ClientId="", ProtoName=MQTT, ProtoVsn=4, CleanSess=true, KeepAlive=60)
|
||||||
V311ConnWithoutClientId = <<16,12,0,4,77,81,84,84,4,2,0,60,0,0>>,
|
V311ConnWithoutClientId = <<16,12,0,4,77,81,84,84,4,2,0,60,0,0>>,
|
||||||
|
@ -70,7 +70,7 @@ parse_connect_test() ->
|
||||||
proto_name = <<"MQTT">>,
|
proto_name = <<"MQTT">>,
|
||||||
client_id = <<>>,
|
client_id = <<>>,
|
||||||
clean_sess = true,
|
clean_sess = true,
|
||||||
keep_alive = 60 } }, <<>>}, emqttd_parser:parse(V311ConnWithoutClientId, State)),
|
keep_alive = 60 } }, <<>>}, emqtt_parser:parse(V311ConnWithoutClientId, State)),
|
||||||
%%CONNECT(Qos=0, Retain=false, Dup=false, ClientId=mosqpub/10452-iMac.loca, ProtoName=MQIsdp, ProtoVsn=3, CleanSess=true, KeepAlive=60, Username=test, Password=******, Will(Qos=1, Retain=false, Topic=/will, Msg=willmsg))
|
%%CONNECT(Qos=0, Retain=false, Dup=false, ClientId=mosqpub/10452-iMac.loca, ProtoName=MQIsdp, ProtoVsn=3, CleanSess=true, KeepAlive=60, Username=test, Password=******, Will(Qos=1, Retain=false, Topic=/will, Msg=willmsg))
|
||||||
ConnBinWithWill = <<16,67,0,6,77,81,73,115,100,112,3,206,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,50,45,105,77,97,99,46,108,111,99,97,0,5,47,119,105,108,108,0,7,119,105,108,108,109,115,103,0,4,116,101,115,116,0,6,112,117,98,108,105,99>>,
|
ConnBinWithWill = <<16,67,0,6,77,81,73,115,100,112,3,206,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,50,45,105,77,97,99,46,108,111,99,97,0,5,47,119,105,108,108,0,7,119,105,108,108,109,115,103,0,4,116,101,115,116,0,6,112,117,98,108,105,99>>,
|
||||||
?assertMatch({ok, #mqtt_packet{
|
?assertMatch({ok, #mqtt_packet{
|
||||||
|
@ -90,11 +90,11 @@ parse_connect_test() ->
|
||||||
will_msg = <<"willmsg">> ,
|
will_msg = <<"willmsg">> ,
|
||||||
username = <<"test">>,
|
username = <<"test">>,
|
||||||
password = <<"public">>}},
|
password = <<"public">>}},
|
||||||
<<>> }, emqttd_parser:parse(ConnBinWithWill, State)),
|
<<>> }, emqtt_parser:parse(ConnBinWithWill, State)),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
parse_publish_test() ->
|
parse_publish_test() ->
|
||||||
State = emqttd_parser:init([]),
|
State = emqtt_parser:init([]),
|
||||||
%%PUBLISH(Qos=1, Retain=false, Dup=false, TopicName=a/b/c, PacketId=1, Payload=<<"hahah">>)
|
%%PUBLISH(Qos=1, Retain=false, Dup=false, TopicName=a/b/c, PacketId=1, Payload=<<"hahah">>)
|
||||||
PubBin = <<50,14,0,5,97,47,98,47,99,0,1,104,97,104,97,104>>,
|
PubBin = <<50,14,0,5,97,47,98,47,99,0,1,104,97,104,97,104>>,
|
||||||
?assertMatch({ok, #mqtt_packet{
|
?assertMatch({ok, #mqtt_packet{
|
||||||
|
@ -104,7 +104,7 @@ parse_publish_test() ->
|
||||||
retain = false},
|
retain = false},
|
||||||
variable = #mqtt_packet_publish{topic_name = <<"a/b/c">>,
|
variable = #mqtt_packet_publish{topic_name = <<"a/b/c">>,
|
||||||
packet_id = 1},
|
packet_id = 1},
|
||||||
payload = <<"hahah">> }, <<>>}, emqttd_parser:parse(PubBin, State)),
|
payload = <<"hahah">> }, <<>>}, emqtt_parser:parse(PubBin, State)),
|
||||||
|
|
||||||
%PUBLISH(Qos=0, Retain=false, Dup=false, TopicName=xxx/yyy, PacketId=undefined, Payload=<<"hello">>)
|
%PUBLISH(Qos=0, Retain=false, Dup=false, TopicName=xxx/yyy, PacketId=undefined, Payload=<<"hello">>)
|
||||||
%DISCONNECT(Qos=0, Retain=false, Dup=false)
|
%DISCONNECT(Qos=0, Retain=false, Dup=false)
|
||||||
|
@ -116,13 +116,13 @@ parse_publish_test() ->
|
||||||
retain = false},
|
retain = false},
|
||||||
variable = #mqtt_packet_publish{topic_name = <<"xxx/yyy">>,
|
variable = #mqtt_packet_publish{topic_name = <<"xxx/yyy">>,
|
||||||
packet_id = undefined},
|
packet_id = undefined},
|
||||||
payload = <<"hello">> }, <<224,0>>}, emqttd_parser:parse(PubBin1, State)),
|
payload = <<"hello">> }, <<224,0>>}, emqtt_parser:parse(PubBin1, State)),
|
||||||
?assertMatch({ok, #mqtt_packet{
|
?assertMatch({ok, #mqtt_packet{
|
||||||
header = #mqtt_packet_header{type = ?DISCONNECT,
|
header = #mqtt_packet_header{type = ?DISCONNECT,
|
||||||
dup = false,
|
dup = false,
|
||||||
qos = 0,
|
qos = 0,
|
||||||
retain = false}
|
retain = false}
|
||||||
}, <<>>}, emqttd_parser:parse(<<224, 0>>, State)).
|
}, <<>>}, emqtt_parser:parse(<<224, 0>>, State)).
|
||||||
|
|
||||||
parse_puback_test() ->
|
parse_puback_test() ->
|
||||||
%%PUBACK(Qos=0, Retain=false, Dup=false, PacketId=1)
|
%%PUBACK(Qos=0, Retain=false, Dup=false, PacketId=1)
|
||||||
|
@ -132,7 +132,7 @@ parse_puback_test() ->
|
||||||
dup = false,
|
dup = false,
|
||||||
qos = 0,
|
qos = 0,
|
||||||
retain = false }
|
retain = false }
|
||||||
}, <<>>}, emqttd_parser:parse(PubAckBin, emqttd_parser:init([]))),
|
}, <<>>}, emqtt_parser:parse(PubAckBin, emqtt_parser:init([]))),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
parse_subscribe_test() ->
|
parse_subscribe_test() ->
|
||||||
|
@ -149,7 +149,7 @@ parse_disconnect_test() ->
|
||||||
dup = false,
|
dup = false,
|
||||||
qos = 0,
|
qos = 0,
|
||||||
retain = false}
|
retain = false}
|
||||||
}, <<>>}, emqttd_parser:parse(Bin, emqttd_parser:init([]))).
|
}, <<>>}, emqtt_parser:parse(Bin, emqtt_parser:init([]))).
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
|
@ -20,57 +20,57 @@
|
||||||
%%% SOFTWARE.
|
%%% SOFTWARE.
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
%%% @doc
|
%%% @doc
|
||||||
%%% emqttd_serialiser tests.
|
%%% emqtt_serialiser tests.
|
||||||
%%%
|
%%%
|
||||||
%%% @end
|
%%% @end
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
-module(emqttd_serialiser_tests).
|
-module(emqtt_serialiser_tests).
|
||||||
|
|
||||||
-include("emqttd_packet.hrl").
|
-include("emqtt_packet.hrl").
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
serialise_connect_test() ->
|
serialise_connect_test() ->
|
||||||
emqttd_serialiser:serialise(?CONNECT_PACKET(#mqtt_packet_connect{})).
|
emqtt_serialiser:serialise(?CONNECT_PACKET(#mqtt_packet_connect{})).
|
||||||
|
|
||||||
serialise_connack_test() ->
|
serialise_connack_test() ->
|
||||||
ConnAck = #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK},
|
ConnAck = #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK},
|
||||||
variable = #mqtt_packet_connack{ack_flags = 0, return_code = 0}},
|
variable = #mqtt_packet_connack{ack_flags = 0, return_code = 0}},
|
||||||
?assertEqual(<<32,2,0,0>>, emqttd_serialiser:serialise(ConnAck)).
|
?assertEqual(<<32,2,0,0>>, emqtt_serialiser:serialise(ConnAck)).
|
||||||
|
|
||||||
serialise_publish_test() ->
|
serialise_publish_test() ->
|
||||||
emqttd_serialiser:serialise(?PUBLISH_PACKET(?QOS_0, <<"Topic">>, undefined, <<"Payload">>)),
|
emqtt_serialiser:serialise(?PUBLISH_PACKET(?QOS_0, <<"Topic">>, undefined, <<"Payload">>)),
|
||||||
emqttd_serialiser:serialise(?PUBLISH_PACKET(?QOS_1, <<"Topic">>, 938, <<"Payload">>)).
|
emqtt_serialiser:serialise(?PUBLISH_PACKET(?QOS_1, <<"Topic">>, 938, <<"Payload">>)).
|
||||||
|
|
||||||
serialise_puback_test() ->
|
serialise_puback_test() ->
|
||||||
emqttd_serialiser:serialise(?PUBACK_PACKET(?PUBACK, 10384)).
|
emqtt_serialiser:serialise(?PUBACK_PACKET(?PUBACK, 10384)).
|
||||||
|
|
||||||
serialise_pubrel_test() ->
|
serialise_pubrel_test() ->
|
||||||
emqttd_serialiser:serialise(?PUBREL_PACKET(10384)).
|
emqtt_serialiser:serialise(?PUBREL_PACKET(10384)).
|
||||||
|
|
||||||
serialise_subscribe_test() ->
|
serialise_subscribe_test() ->
|
||||||
TopicTable = [{<<"TopicQos0">>, ?QOS_0}, {<<"TopicQos1">>, ?QOS_1}, {<<"TopicQos2">>, ?QOS_2}],
|
TopicTable = [{<<"TopicQos0">>, ?QOS_0}, {<<"TopicQos1">>, ?QOS_1}, {<<"TopicQos2">>, ?QOS_2}],
|
||||||
emqttd_serialiser:serialise(?SUBSCRIBE_PACKET(10, TopicTable)).
|
emqtt_serialiser:serialise(?SUBSCRIBE_PACKET(10, TopicTable)).
|
||||||
|
|
||||||
serialise_suback_test() ->
|
serialise_suback_test() ->
|
||||||
emqttd_serialiser:serialise(?SUBACK_PACKET(10, [?QOS_0, ?QOS_1, 128])).
|
emqtt_serialiser:serialise(?SUBACK_PACKET(10, [?QOS_0, ?QOS_1, 128])).
|
||||||
|
|
||||||
serialise_unsubscribe_test() ->
|
serialise_unsubscribe_test() ->
|
||||||
emqttd_serialiser:serialise(?UNSUBSCRIBE_PACKET(10, [<<"Topic1">>, <<"Topic2">>])).
|
emqtt_serialiser:serialise(?UNSUBSCRIBE_PACKET(10, [<<"Topic1">>, <<"Topic2">>])).
|
||||||
|
|
||||||
serialise_unsuback_test() ->
|
serialise_unsuback_test() ->
|
||||||
emqttd_serialiser:serialise(?UNSUBACK_PACKET(10)).
|
emqtt_serialiser:serialise(?UNSUBACK_PACKET(10)).
|
||||||
|
|
||||||
serialise_pingreq_test() ->
|
serialise_pingreq_test() ->
|
||||||
emqttd_serialiser:serialise(?PACKET(?PINGREQ)).
|
emqtt_serialiser:serialise(?PACKET(?PINGREQ)).
|
||||||
|
|
||||||
serialise_pingresp_test() ->
|
serialise_pingresp_test() ->
|
||||||
emqttd_serialiser:serialise(?PACKET(?PINGRESP)).
|
emqtt_serialiser:serialise(?PACKET(?PINGRESP)).
|
||||||
|
|
||||||
serialise_disconnect_test() ->
|
serialise_disconnect_test() ->
|
||||||
emqttd_serialiser:serialise(?PACKET(?DISCONNECT)).
|
emqtt_serialiser:serialise(?PACKET(?DISCONNECT)).
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
|
@ -19,11 +19,9 @@
|
||||||
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||||
%% SOFTWARE.
|
%% SOFTWARE.
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-module(emqttd_topic_tests).
|
-module(emqtt_topic_tests).
|
||||||
|
|
||||||
-include("emqttd_topic.hrl").
|
-import(emqtt_topic, [validate/1, wildcard/1, match/2, triples/1, words/1]).
|
||||||
|
|
||||||
-import(emqttd_topic, [validate/1, wildcard/1, match/2, triples/1, words/1]).
|
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
|
|
||||||
|
@ -101,9 +99,9 @@ triples_perf_test() ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
type_test() ->
|
type_test() ->
|
||||||
?assertEqual(false, wildcard(#topic{name = <<"/a/b/cdkd">>})),
|
?assertEqual(false, wildcard(<<"/a/b/cdkd">>)),
|
||||||
?assertEqual(true, wildcard(#topic{name = <<"/a/+/d">>})),
|
?assertEqual(true, wildcard(<<"/a/+/d">>)),
|
||||||
?assertEqual(true, wildcard(#topic{name = <<"/a/b/#">>})).
|
?assertEqual(true, wildcard(<<"/a/b/#">>)).
|
||||||
|
|
||||||
words_test() ->
|
words_test() ->
|
||||||
?assertMatch(['', <<"abkc">>, <<"19383">>, '+', <<"akakdkkdkak">>, '#'], words(<<"/abkc/19383/+/akakdkkdkak/#">>)),
|
?assertMatch(['', <<"abkc">>, <<"19383">>, '+', <<"akakdkkdkak">>, '#'], words(<<"/abkc/19383/+/akakdkkdkak/#">>)),
|
|
@ -20,7 +20,7 @@
|
||||||
%% SOFTWARE.
|
%% SOFTWARE.
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%%% @doc
|
%%% @doc
|
||||||
%%% emqtt header.
|
%%% emqttd header.
|
||||||
%%%
|
%%%
|
||||||
%%% @end
|
%%% @end
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
|
@ -37,16 +37,38 @@
|
||||||
-define(ERTS_MINIMUM, "6.0").
|
-define(ERTS_MINIMUM, "6.0").
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% PubSub Type
|
%% PubSub
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-type pubsub() :: publish | subscribe.
|
-type pubsub() :: publish | subscribe.
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% MQTT Topic
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
-record(mqtt_topic, {
|
||||||
|
topic :: binary(),
|
||||||
|
node :: node()
|
||||||
|
}).
|
||||||
|
|
||||||
|
-type mqtt_topic() :: #mqtt_topic{}.
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% MQTT Topic Subscriber
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
-record(mqtt_subscriber, {
|
||||||
|
topic :: binary(),
|
||||||
|
qos = 0 :: 0 | 1 | 2,
|
||||||
|
subpid :: pid()
|
||||||
|
}).
|
||||||
|
|
||||||
|
-type mqtt_subscriber() :: #mqtt_subscriber{}.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% MQTT Client
|
%% MQTT Client
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-record(mqtt_client, {
|
-record(mqtt_client, {
|
||||||
client_id,
|
clientid :: binary(),
|
||||||
username
|
username :: binary() | undefined,
|
||||||
|
ipaddr :: inet:ip_address()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-type mqtt_client() :: #mqtt_client{}.
|
-type mqtt_client() :: #mqtt_client{}.
|
||||||
|
@ -55,7 +77,7 @@
|
||||||
%% MQTT Session
|
%% MQTT Session
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-record(mqtt_session, {
|
-record(mqtt_session, {
|
||||||
client_id,
|
clientid,
|
||||||
session_pid,
|
session_pid,
|
||||||
subscriptions = [],
|
subscriptions = [],
|
||||||
awaiting_ack,
|
awaiting_ack,
|
||||||
|
@ -65,24 +87,21 @@
|
||||||
-type mqtt_session() :: #mqtt_session{}.
|
-type mqtt_session() :: #mqtt_session{}.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% MQTT User Management
|
%% MQTT Message
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
-record(mqtt_user, {
|
|
||||||
username :: binary(),
|
|
||||||
ipaddr :: inet:ip_address(),
|
|
||||||
clientid :: binary()
|
|
||||||
}).
|
|
||||||
|
|
||||||
-type mqtt_user() :: #mqtt_user{}.
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
%% MQTT Authorization
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
%% {subscribe, From, Topic}
|
-type mqtt_msgid() :: undefined | 1..16#ffff.
|
||||||
%% {publish, From, Topic}
|
|
||||||
|
|
||||||
%%TODO: ClientId | Username --> Pub | Sub --> Topics
|
-record(mqtt_message, {
|
||||||
|
%% topic is first for message may be retained
|
||||||
|
topic :: binary(),
|
||||||
|
qos = 0 :: 0 | 1 | 2,
|
||||||
|
retain = false :: boolean(),
|
||||||
|
dup = false :: boolean(),
|
||||||
|
msgid :: mqtt_msgid(),
|
||||||
|
payload :: binary()}).
|
||||||
|
|
||||||
|
-type mqtt_message() :: #mqtt_message{}.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% MQTT Plugin
|
%% MQTT Plugin
|
||||||
|
@ -90,11 +109,3 @@
|
||||||
|
|
||||||
-record(mqtt_plugin, {name, version, attrs, description}).
|
-record(mqtt_plugin, {name, version, attrs, description}).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
%% MQTT Retained Message
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
-record(message_retained, {topic, qos, payload}).
|
|
||||||
|
|
||||||
-type message_retained() :: #message_retained{}.
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -73,7 +73,7 @@ compile(who, {user, Username}) ->
|
||||||
{user, bin(Username)};
|
{user, bin(Username)};
|
||||||
|
|
||||||
compile(topic, Topic) ->
|
compile(topic, Topic) ->
|
||||||
Words = emqttd_topic:words(Topic),
|
Words = emqtt_topic:words(Topic),
|
||||||
case 'pattern?'(Words) of
|
case 'pattern?'(Words) of
|
||||||
true -> {pattern, Words};
|
true -> {pattern, Words};
|
||||||
false -> Words
|
false -> Words
|
||||||
|
@ -126,18 +126,18 @@ match_topics(_User, _Topic, []) ->
|
||||||
false;
|
false;
|
||||||
match_topics(User, Topic, [{pattern, PatternFilter}|Filters]) ->
|
match_topics(User, Topic, [{pattern, PatternFilter}|Filters]) ->
|
||||||
TopicFilter = feed_var(User, PatternFilter),
|
TopicFilter = feed_var(User, PatternFilter),
|
||||||
case match_topic(emqttd_topic:words(Topic), TopicFilter) of
|
case match_topic(emqtt_topic:words(Topic), TopicFilter) of
|
||||||
true -> true;
|
true -> true;
|
||||||
false -> match_topics(User, Topic, Filters)
|
false -> match_topics(User, Topic, Filters)
|
||||||
end;
|
end;
|
||||||
match_topics(User, Topic, [TopicFilter|Filters]) ->
|
match_topics(User, Topic, [TopicFilter|Filters]) ->
|
||||||
case match_topic(emqttd_topic:words(Topic), TopicFilter) of
|
case match_topic(emqtt_topic:words(Topic), TopicFilter) of
|
||||||
true -> true;
|
true -> true;
|
||||||
false -> match_topics(User, Topic, Filters)
|
false -> match_topics(User, Topic, Filters)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
match_topic(Topic, TopicFilter) ->
|
match_topic(Topic, TopicFilter) ->
|
||||||
emqttd_topic:match(Topic, TopicFilter).
|
emqtt_topic:match(Topic, TopicFilter).
|
||||||
|
|
||||||
feed_var(User, Pattern) ->
|
feed_var(User, Pattern) ->
|
||||||
feed_var(User, Pattern, []).
|
feed_var(User, Pattern, []).
|
||||||
|
|
|
@ -53,8 +53,8 @@
|
||||||
|
|
||||||
-callback init(AclOpts :: list()) -> {ok, State :: any()}.
|
-callback init(AclOpts :: list()) -> {ok, State :: any()}.
|
||||||
|
|
||||||
-callback check_acl({User, PubSub, Topic}, State :: any()) -> allow | deny | ignore when
|
-callback check_acl({Client, PubSub, Topic}, State :: any()) -> allow | deny | ignore when
|
||||||
User :: mqtt_user(),
|
Client :: mqtt_client(),
|
||||||
PubSub :: pubsub(),
|
PubSub :: pubsub(),
|
||||||
Topic :: binary().
|
Topic :: binary().
|
||||||
|
|
||||||
|
|
|
@ -46,10 +46,10 @@
|
||||||
|
|
||||||
-ifdef(use_specs).
|
-ifdef(use_specs).
|
||||||
|
|
||||||
-callback check(User, Password, State) -> ok | ignore | {error, string()} when
|
-callback check(Client, Password, State) -> ok | ignore | {error, string()} when
|
||||||
User :: mqtt_user(),
|
Client :: mqtt_client(),
|
||||||
Password :: binary(),
|
Password :: binary(),
|
||||||
State :: any().
|
State :: any().
|
||||||
|
|
||||||
-callback description() -> string().
|
-callback description() -> string().
|
||||||
|
|
||||||
|
@ -68,18 +68,18 @@ behaviour_info(_Other) ->
|
||||||
start_link(AuthMods) ->
|
start_link(AuthMods) ->
|
||||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [AuthMods], []).
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [AuthMods], []).
|
||||||
|
|
||||||
-spec login(mqtt_user(), undefined | binary()) -> ok | {error, string()}.
|
-spec login(mqtt_client(), undefined | binary()) -> ok | {error, string()}.
|
||||||
login(User, Password) when is_record(User, mqtt_user) ->
|
login(Client, Password) when is_record(Client, mqtt_client) ->
|
||||||
[{_, AuthMods}] = ets:lookup(?AUTH_TABLE, auth_modules),
|
[{_, AuthMods}] = ets:lookup(?AUTH_TABLE, auth_modules),
|
||||||
check(User, Password, AuthMods).
|
check(Client, Password, AuthMods).
|
||||||
|
|
||||||
check(_User, _Password, []) ->
|
check(_Client, _Password, []) ->
|
||||||
{error, "No auth module to check!"};
|
{error, "No auth module to check!"};
|
||||||
check(User, Password, [{Mod, State} | Mods]) ->
|
check(Client, Password, [{Mod, State} | Mods]) ->
|
||||||
case Mod:check(User, Password, State) of
|
case Mod:check(Client, Password, State) of
|
||||||
ok -> ok;
|
ok -> ok;
|
||||||
{error, Reason} -> {error, Reason};
|
{error, Reason} -> {error, Reason};
|
||||||
ignore -> check(User, Password, Mods)
|
ignore -> check(Client, Password, Mods)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
add_module(Mod, Opts) ->
|
add_module(Mod, Opts) ->
|
||||||
|
|
|
@ -30,7 +30,7 @@
|
||||||
|
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
|
||||||
-include("emqttd_packet.hrl").
|
-include_lib("emqtt/include/emqtt_packet.hrl").
|
||||||
|
|
||||||
%% API Function Exports
|
%% API Function Exports
|
||||||
-export([start_link/3]).
|
-export([start_link/3]).
|
||||||
|
|
|
@ -26,7 +26,7 @@
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
-module(emqttd_broker).
|
-module(emqttd_broker).
|
||||||
|
|
||||||
-include("emqttd_packet.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
-include("emqttd_systop.hrl").
|
-include("emqttd_systop.hrl").
|
||||||
|
|
||||||
|
|
|
@ -41,7 +41,7 @@
|
||||||
|
|
||||||
-include("emqttd.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
-include("emqttd_packet.hrl").
|
-include_lib("emqtt/include/emqtt_packet.hrl").
|
||||||
|
|
||||||
%%Client State...
|
%%Client State...
|
||||||
-record(state, {transport,
|
-record(state, {transport,
|
||||||
|
|
|
@ -26,7 +26,7 @@
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
-module(emqttd_event).
|
-module(emqttd_event).
|
||||||
|
|
||||||
-include("emqttd_packet.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
%% API Function Exports
|
%% API Function Exports
|
||||||
-export([start_link/0,
|
-export([start_link/0,
|
||||||
|
|
|
@ -30,8 +30,6 @@
|
||||||
|
|
||||||
-include("emqttd.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
-include("emqttd_packet.hrl").
|
|
||||||
|
|
||||||
-import(proplists, [get_value/2, get_value/3]).
|
-import(proplists, [get_value/2, get_value/3]).
|
||||||
|
|
||||||
-export([handle/1]).
|
-export([handle/1]).
|
||||||
|
@ -87,7 +85,7 @@ validate(qos, Qos) ->
|
||||||
(Qos >= ?QOS_0) and (Qos =< ?QOS_2);
|
(Qos >= ?QOS_0) and (Qos =< ?QOS_2);
|
||||||
|
|
||||||
validate(topic, Topic) ->
|
validate(topic, Topic) ->
|
||||||
emqttd_topic:validate({name, Topic}).
|
emqtt_topic:validate({name, Topic}).
|
||||||
|
|
||||||
int(S) -> list_to_integer(S).
|
int(S) -> list_to_integer(S).
|
||||||
|
|
||||||
|
|
|
@ -30,7 +30,7 @@
|
||||||
|
|
||||||
-include("emqttd.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
-include("emqttd_packet.hrl").
|
-include_lib("emqtt/include/emqtt_packet.hrl").
|
||||||
|
|
||||||
-export([from_packet/1, to_packet/1]).
|
-export([from_packet/1, to_packet/1]).
|
||||||
|
|
||||||
|
|
|
@ -28,7 +28,7 @@
|
||||||
|
|
||||||
-author('feng@emqtt.io').
|
-author('feng@emqtt.io').
|
||||||
|
|
||||||
-include("emqttd_packet.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
-include("emqttd_systop.hrl").
|
-include("emqttd_systop.hrl").
|
||||||
|
|
||||||
|
|
|
@ -28,7 +28,7 @@
|
||||||
|
|
||||||
-include("emqttd.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
-include("emqttd_packet.hrl").
|
-include_lib("emqtt/include/emqtt_packet.hrl").
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([init/2, client_id/1]).
|
-export([init/2, client_id/1]).
|
||||||
|
@ -93,7 +93,7 @@ received(_Packet, State = #proto_state{connected = false}) ->
|
||||||
|
|
||||||
received(Packet = ?PACKET(_Type), State = #proto_state{peername = Peername,
|
received(Packet = ?PACKET(_Type), State = #proto_state{peername = Peername,
|
||||||
client_id = ClientId}) ->
|
client_id = ClientId}) ->
|
||||||
lager:debug("RECV from ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqttd_packet:dump(Packet)]),
|
lager:debug("RECV from ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqtt_packet:dump(Packet)]),
|
||||||
case validate_packet(Packet) of
|
case validate_packet(Packet) of
|
||||||
ok ->
|
ok ->
|
||||||
handle(Packet, State);
|
handle(Packet, State);
|
||||||
|
@ -110,7 +110,7 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername =
|
||||||
keep_alive = KeepAlive,
|
keep_alive = KeepAlive,
|
||||||
client_id = ClientId} = Var,
|
client_id = ClientId} = Var,
|
||||||
|
|
||||||
lager:debug("RECV from ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqttd_packet:dump(Packet)]),
|
lager:debug("RECV from ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqtt_packet:dump(Packet)]),
|
||||||
|
|
||||||
State1 = State#proto_state{proto_ver = ProtoVer,
|
State1 = State#proto_state{proto_ver = ProtoVer,
|
||||||
username = Username,
|
username = Username,
|
||||||
|
@ -226,7 +226,7 @@ send({_From, Message = #mqtt_message{qos = Qos}}, State = #proto_state{session =
|
||||||
send(emqttd_message:to_packet(Message1), State#proto_state{session = NewSession});
|
send(emqttd_message:to_packet(Message1), State#proto_state{session = NewSession});
|
||||||
|
|
||||||
send(Packet, State = #proto_state{transport = Transport, socket = Sock, peername = Peername, client_id = ClientId}) when is_record(Packet, mqtt_packet) ->
|
send(Packet, State = #proto_state{transport = Transport, socket = Sock, peername = Peername, client_id = ClientId}) when is_record(Packet, mqtt_packet) ->
|
||||||
lager:debug("SENT to ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqttd_packet:dump(Packet)]),
|
lager:debug("SENT to ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqtt_packet:dump(Packet)]),
|
||||||
sent_stats(Packet),
|
sent_stats(Packet),
|
||||||
Data = emqttd_serialiser:serialise(Packet),
|
Data = emqttd_serialiser:serialise(Packet),
|
||||||
lager:debug("SENT to ~s: ~p", [emqttd_net:format(Peername), Data]),
|
lager:debug("SENT to ~s: ~p", [emqttd_net:format(Peername), Data]),
|
||||||
|
@ -297,7 +297,7 @@ validate_clientid(#mqtt_packet_connect {proto_ver = Ver, clean_sess = CleanSess,
|
||||||
|
|
||||||
validate_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH},
|
validate_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH},
|
||||||
variable = #mqtt_packet_publish{topic_name = Topic}}) ->
|
variable = #mqtt_packet_publish{topic_name = Topic}}) ->
|
||||||
case emqttd_topic:validate({name, Topic}) of
|
case emqtt_topic:validate({name, Topic}) of
|
||||||
true -> ok;
|
true -> ok;
|
||||||
false -> lager:warning("Error publish topic: ~p", [Topic]), {error, badtopic}
|
false -> lager:warning("Error publish topic: ~p", [Topic]), {error, badtopic}
|
||||||
end;
|
end;
|
||||||
|
@ -321,7 +321,7 @@ validate_topics(Type, []) when Type =:= name orelse Type =:= filter ->
|
||||||
|
|
||||||
validate_topics(Type, Topics) when Type =:= name orelse Type =:= filter ->
|
validate_topics(Type, Topics) when Type =:= name orelse Type =:= filter ->
|
||||||
ErrTopics = [Topic || {Topic, Qos} <- Topics,
|
ErrTopics = [Topic || {Topic, Qos} <- Topics,
|
||||||
not (emqttd_topic:validate({Type, Topic}) and validate_qos(Qos))],
|
not (emqtt_topic:validate({Type, Topic}) and validate_qos(Qos))],
|
||||||
case ErrTopics of
|
case ErrTopics of
|
||||||
[] -> ok;
|
[] -> ok;
|
||||||
_ -> lager:error("Error Topics: ~p", [ErrTopics]), {error, badtopic}
|
_ -> lager:error("Error Topics: ~p", [ErrTopics]), {error, badtopic}
|
||||||
|
|
|
@ -30,9 +30,7 @@
|
||||||
|
|
||||||
-include("emqttd.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
-include("emqttd_topic.hrl").
|
-include_lib("emqtt/include/emqtt_packet.hrl").
|
||||||
|
|
||||||
-include("emqttd_packet.hrl").
|
|
||||||
|
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
|
||||||
|
@ -68,20 +66,20 @@ mnesia(create) ->
|
||||||
ok = emqttd_mnesia:create_table(topic, [
|
ok = emqttd_mnesia:create_table(topic, [
|
||||||
{type, bag},
|
{type, bag},
|
||||||
{ram_copies, [node()]},
|
{ram_copies, [node()]},
|
||||||
{record_name, topic},
|
{record_name, mqtt_topic},
|
||||||
{attributes, record_info(fields, topic)}]),
|
{attributes, record_info(fields, mqtt_topic)}]),
|
||||||
%% local subscriber table, not shared with other nodes
|
%% local subscriber table, not shared with other nodes
|
||||||
ok = emqttd_mnesia:create_table(topic_subscriber, [
|
ok = emqttd_mnesia:create_table(subscriber, [
|
||||||
{type, bag},
|
{type, bag},
|
||||||
{ram_copies, [node()]},
|
{ram_copies, [node()]},
|
||||||
{record_name, topic_subscriber},
|
{record_name, mqtt_subscriber},
|
||||||
{attributes, record_info(fields, topic_subscriber)},
|
{attributes, record_info(fields, mqtt_subscriber)},
|
||||||
{index, [subpid]},
|
{index, [subpid]},
|
||||||
{local_content, true}]);
|
{local_content, true}]);
|
||||||
|
|
||||||
mnesia(replicate) ->
|
mnesia(replicate) ->
|
||||||
ok = emqttd_mnesia:copy_table(topic),
|
ok = emqttd_mnesia:copy_table(topic),
|
||||||
ok = emqttd_mnesia:copy_table(topic_subscriber).
|
ok = emqttd_mnesia:copy_table(subscriber).
|
||||||
|
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
%%% API
|
%%% API
|
||||||
|
@ -105,7 +103,8 @@ start_link() ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec create(binary()) -> ok.
|
-spec create(binary()) -> ok.
|
||||||
create(Topic) when is_binary(Topic) ->
|
create(Topic) when is_binary(Topic) ->
|
||||||
{atomic, ok} = mnesia:transaction(fun insert_topic/1, [emqttd_topic:new(Topic)]), ok.
|
Record = #mqtt_topic{topic = Topic, node = node()},
|
||||||
|
{atomic, ok} = mnesia:transaction(fun insert_topic/1, [Record]), ok.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc
|
%% @doc
|
||||||
|
@ -128,7 +127,7 @@ subscribe(Topics = [{_Topic, _Qos}|_]) ->
|
||||||
|
|
||||||
-spec subscribe(Topic :: binary(), Qos :: mqtt_qos()) -> {ok, Qos :: mqtt_qos()}.
|
-spec subscribe(Topic :: binary(), Qos :: mqtt_qos()) -> {ok, Qos :: mqtt_qos()}.
|
||||||
subscribe(Topic, Qos) when is_binary(Topic) andalso ?IS_QOS(Qos) ->
|
subscribe(Topic, Qos) when is_binary(Topic) andalso ?IS_QOS(Qos) ->
|
||||||
TopicRecord = emqttd_topic:new(Topic),
|
TopicRecord = #mqtt_topic{topic = Topic, node = node()},
|
||||||
Subscriber = #topic_subscriber{topic = Topic, qos = Qos, subpid = self()},
|
Subscriber = #topic_subscriber{topic = Topic, qos = Qos, subpid = self()},
|
||||||
F = fun() ->
|
F = fun() ->
|
||||||
case insert_topic(TopicRecord) of
|
case insert_topic(TopicRecord) of
|
||||||
|
@ -150,9 +149,10 @@ subscribe(Topic, Qos) when is_binary(Topic) andalso ?IS_QOS(Qos) ->
|
||||||
-spec unsubscribe(binary() | list(binary())) -> ok.
|
-spec unsubscribe(binary() | list(binary())) -> ok.
|
||||||
unsubscribe(Topic) when is_binary(Topic) ->
|
unsubscribe(Topic) when is_binary(Topic) ->
|
||||||
SubPid = self(),
|
SubPid = self(),
|
||||||
TopicRecord = emqttd_topic:new(Topic),
|
TopicRecord = #mqtt_topic{topic = Topic, node = node()},
|
||||||
F = fun() ->
|
F = fun() ->
|
||||||
Pattern = #topic_subscriber{topic = Topic, _ = '_', subpid = SubPid},
|
%%TODO record name...
|
||||||
|
Pattern = #mqtt_subscriber{topic = Topic, _ = '_', subpid = SubPid},
|
||||||
[mnesia:delete_object(Sub) || Sub <- mnesia:match_object(Pattern)],
|
[mnesia:delete_object(Sub) || Sub <- mnesia:match_object(Pattern)],
|
||||||
try_remove_topic(TopicRecord)
|
try_remove_topic(TopicRecord)
|
||||||
end,
|
end,
|
||||||
|
@ -173,7 +173,7 @@ publish(Msg=#mqtt_message{topic=Topic}) ->
|
||||||
|
|
||||||
-spec publish(Topic :: binary(), Msg :: mqtt_message()) -> any().
|
-spec publish(Topic :: binary(), Msg :: mqtt_message()) -> any().
|
||||||
publish(Topic, Msg) when is_binary(Topic) ->
|
publish(Topic, Msg) when is_binary(Topic) ->
|
||||||
lists:foreach(fun(#topic{name=Name, node=Node}) ->
|
lists:foreach(fun(#mqtt_topic{topic=Name, node=Node}) ->
|
||||||
case Node =:= node() of
|
case Node =:= node() of
|
||||||
true -> dispatch(Name, Msg);
|
true -> dispatch(Name, Msg);
|
||||||
false -> rpc:cast(Node, ?MODULE, dispatch, [Name, Msg])
|
false -> rpc:cast(Node, ?MODULE, dispatch, [Name, Msg])
|
||||||
|
@ -194,7 +194,7 @@ dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) ->
|
||||||
setstats(dropped);
|
setstats(dropped);
|
||||||
Subscribers ->
|
Subscribers ->
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun(#topic_subscriber{qos = SubQos, subpid=SubPid}) ->
|
fun(#mqtt_subscriber{qos = SubQos, subpid=SubPid}) ->
|
||||||
Msg1 = if
|
Msg1 = if
|
||||||
Qos > SubQos -> Msg#mqtt_message{qos = SubQos};
|
Qos > SubQos -> Msg#mqtt_message{qos = SubQos};
|
||||||
true -> Msg
|
true -> Msg
|
||||||
|
@ -210,7 +210,7 @@ dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) ->
|
||||||
%%
|
%%
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec match(Topic :: binary()) -> [topic()].
|
-spec match(Topic :: binary()) -> [mqtt_topic()].
|
||||||
match(Topic) when is_binary(Topic) ->
|
match(Topic) when is_binary(Topic) ->
|
||||||
MatchedTopics = mnesia:async_dirty(fun emqttd_trie:find/1, [Topic]),
|
MatchedTopics = mnesia:async_dirty(fun emqttd_trie:find/1, [Topic]),
|
||||||
lists:flatten([mnesia:dirty_read(topic, Name) || Name <- MatchedTopics]).
|
lists:flatten([mnesia:dirty_read(topic, Name) || Name <- MatchedTopics]).
|
||||||
|
@ -235,7 +235,7 @@ handle_cast(Msg, State) ->
|
||||||
lager:error("Bad Msg: ~p", [Msg]),
|
lager:error("Bad Msg: ~p", [Msg]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({mnesia_table_event, {write, #topic_subscriber{subpid = Pid}, _ActivityId}},
|
handle_info({mnesia_table_event, {write, #mqtt_subscriber{subpid = Pid}, _ActivityId}},
|
||||||
State = #state{submap = SubMap}) ->
|
State = #state{submap = SubMap}) ->
|
||||||
NewSubMap =
|
NewSubMap =
|
||||||
case maps:is_key(Pid, SubMap) of
|
case maps:is_key(Pid, SubMap) of
|
||||||
|
@ -247,7 +247,7 @@ handle_info({mnesia_table_event, {write, #topic_subscriber{subpid = Pid}, _Activ
|
||||||
setstats(subscribers),
|
setstats(subscribers),
|
||||||
{noreply, State#state{submap = NewSubMap}};
|
{noreply, State#state{submap = NewSubMap}};
|
||||||
|
|
||||||
handle_info({mnesia_table_event, {write, #topic{}, _ActivityId}}, State) ->
|
handle_info({mnesia_table_event, {write, #mqtt_topic{}, _ActivityId}}, State) ->
|
||||||
%%TODO: this is not right when clusterd.
|
%%TODO: this is not right when clusterd.
|
||||||
setstats(topics),
|
setstats(topics),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
|
@ -29,7 +29,7 @@
|
||||||
|
|
||||||
-module(emqttd_queue).
|
-module(emqttd_queue).
|
||||||
|
|
||||||
-include("emqttd_packet.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
-export([new/1, new/2, in/3, all/1, clear/1]).
|
-export([new/1, new/2, in/3, all/1, clear/1]).
|
||||||
|
|
||||||
|
@ -66,7 +66,7 @@ in(ClientId, Message = #mqtt_message{qos = Qos},
|
||||||
Wrapper#mqtt_queue_wrapper{queue = queue:in(Message, Queue)};
|
Wrapper#mqtt_queue_wrapper{queue = queue:in(Message, Queue)};
|
||||||
false -> % full
|
false -> % full
|
||||||
if
|
if
|
||||||
Qos =:= ?QOS_0 ->
|
Qos =:= 0 ->
|
||||||
lager:warning("Queue ~s drop qos0 message: ~p", [ClientId, Message]),
|
lager:warning("Queue ~s drop qos0 message: ~p", [ClientId, Message]),
|
||||||
Wrapper;
|
Wrapper;
|
||||||
true ->
|
true ->
|
||||||
|
|
|
@ -30,8 +30,6 @@
|
||||||
|
|
||||||
-include("emqttd.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
-include("emqttd_packet.hrl").
|
|
||||||
|
|
||||||
-define(RETAINED_TABLE, message_retained).
|
-define(RETAINED_TABLE, message_retained).
|
||||||
|
|
||||||
%% API Function Exports
|
%% API Function Exports
|
||||||
|
@ -83,12 +81,12 @@ env() ->
|
||||||
CPid :: pid().
|
CPid :: pid().
|
||||||
redeliver(Topics, CPid) when is_pid(CPid) ->
|
redeliver(Topics, CPid) when is_pid(CPid) ->
|
||||||
lists:foreach(fun(Topic) ->
|
lists:foreach(fun(Topic) ->
|
||||||
case emqttd_topic:wildcard(Topic) of
|
case emqtt_topic:wildcard(Topic) of
|
||||||
false ->
|
false ->
|
||||||
dispatch(CPid, mnesia:dirty_read(message_retained, Topic));
|
dispatch(CPid, mnesia:dirty_read(message_retained, Topic));
|
||||||
true ->
|
true ->
|
||||||
Fun = fun(Msg = #message_retained{topic = Name}, Acc) ->
|
Fun = fun(Msg = #message_retained{topic = Name}, Acc) ->
|
||||||
case emqttd_topic:match(Name, Topic) of
|
case emqtt_topic:match(Name, Topic) of
|
||||||
true -> [Msg|Acc];
|
true -> [Msg|Acc];
|
||||||
false -> Acc
|
false -> Acc
|
||||||
end
|
end
|
||||||
|
|
|
@ -23,7 +23,7 @@
|
||||||
%%route chain... statistics
|
%%route chain... statistics
|
||||||
-module(emqttd_router).
|
-module(emqttd_router).
|
||||||
|
|
||||||
-include("emqttd_packet.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
|
||||||
|
|
|
@ -28,8 +28,6 @@
|
||||||
|
|
||||||
-include("emqttd.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
-include("emqttd_packet.hrl").
|
|
||||||
|
|
||||||
%% API Function Exports
|
%% API Function Exports
|
||||||
-export([start/1,
|
-export([start/1,
|
||||||
resume/3,
|
resume/3,
|
||||||
|
|
|
@ -108,7 +108,7 @@ insert(Topic) when is_binary(Topic) ->
|
||||||
mnesia:write(TrieNode#trie_node{topic=Topic});
|
mnesia:write(TrieNode#trie_node{topic=Topic});
|
||||||
[] ->
|
[] ->
|
||||||
%add trie path
|
%add trie path
|
||||||
[add_path(Triple) || Triple <- emqttd_topic:triples(Topic)],
|
[add_path(Triple) || Triple <- emqtt_topic:triples(Topic)],
|
||||||
%add last node
|
%add last node
|
||||||
mnesia:write(#trie_node{node_id=Topic, topic=Topic})
|
mnesia:write(#trie_node{node_id=Topic, topic=Topic})
|
||||||
end.
|
end.
|
||||||
|
@ -121,7 +121,7 @@ insert(Topic) when is_binary(Topic) ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec find(Topic :: binary()) -> list(MatchedTopic :: binary()).
|
-spec find(Topic :: binary()) -> list(MatchedTopic :: binary()).
|
||||||
find(Topic) when is_binary(Topic) ->
|
find(Topic) when is_binary(Topic) ->
|
||||||
TrieNodes = match_node(root, emqttd_topic:words(Topic), []),
|
TrieNodes = match_node(root, emqtt_topic:words(Topic), []),
|
||||||
[Name || #trie_node{topic=Name} <- TrieNodes, Name=/= undefined].
|
[Name || #trie_node{topic=Name} <- TrieNodes, Name=/= undefined].
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -135,7 +135,7 @@ delete(Topic) when is_binary(Topic) ->
|
||||||
case mnesia:read(trie_node, Topic) of
|
case mnesia:read(trie_node, Topic) of
|
||||||
[#trie_node{edge_count=0}] ->
|
[#trie_node{edge_count=0}] ->
|
||||||
mnesia:delete({trie_node, Topic}),
|
mnesia:delete({trie_node, Topic}),
|
||||||
delete_path(lists:reverse(emqttd_topic:triples(Topic)));
|
delete_path(lists:reverse(emqtt_topic:triples(Topic)));
|
||||||
[TrieNode] ->
|
[TrieNode] ->
|
||||||
mnesia:write(TrieNode#trie_node{topic=Topic});
|
mnesia:write(TrieNode#trie_node{topic=Topic});
|
||||||
[] ->
|
[] ->
|
||||||
|
|
Loading…
Reference in New Issue