diff --git a/docs/source/coap.rst b/docs/source/coap.rst index 32d482e17..41d73e618 100644 --- a/docs/source/coap.rst +++ b/docs/source/coap.rst @@ -1,3 +1,8 @@ -============== + +.. _coap: + +============= CoAP Protocol -============== +============= + + diff --git a/docs/source/mqtt-sn.rst b/docs/source/mqtt-sn.rst new file mode 100644 index 000000000..22bcabf6f --- /dev/null +++ b/docs/source/mqtt-sn.rst @@ -0,0 +1,10 @@ + +.. _mqtt_sn: + +TODO:... + +================ +MQTT-SN Protocol +================ + + diff --git a/include/emqttd.hrl b/include/emqttd.hrl index 0fc868355..4fd6749f0 100644 --- a/include/emqttd.hrl +++ b/include/emqttd.hrl @@ -44,41 +44,13 @@ -define(PUBSUB(PS), (PS =:= publish orelse PS =:= subscribe)). -%%-------------------------------------------------------------------- -%% MQTT Topic -%%-------------------------------------------------------------------- --record(mqtt_topic, { - topic :: binary(), - flags :: [retained | static] -}). - --type(mqtt_topic() :: #mqtt_topic{}). - -%%-------------------------------------------------------------------- -%% MQTT Route -%%-------------------------------------------------------------------- --record(mqtt_route, { - topic :: binary(), - node :: node() -}). - --type(mqtt_route() :: #mqtt_route{}). - -%%-------------------------------------------------------------------- -%% MQTT Subscription -%%-------------------------------------------------------------------- --record(mqtt_subscription, { - subid :: binary() | atom() | pid(), - topic :: binary(), - qos = 0 :: 0 | 1 | 2 -}). - --type(mqtt_subscription() :: #mqtt_subscription{}). - %%-------------------------------------------------------------------- %% MQTT Client %%-------------------------------------------------------------------- +-type(ws_header_key() :: atom() | binary() | string()). +-type(ws_header_val() :: atom() | binary() | string() | integer()). + -record(mqtt_client, { client_id :: binary() | undefined, client_pid :: pid(), @@ -88,9 +60,7 @@ proto_ver :: 3 | 4, keepalive = 0, will_topic :: undefined | binary(), - token :: binary() | undefined, %% auth token - cookie :: binary() | undefined, %% auth cookie - %%ws_initial_headers :: list({ws_header_key(), ws_header_val()}), + ws_initial_headers :: list({ws_header_key(), ws_header_val()}), connected_at :: erlang:timestamp() }). @@ -117,6 +87,7 @@ -record(mqtt_message, { msgid :: mqtt_msgid(), %% Global unique message ID pktid :: mqtt_pktid(), %% PacketId + from :: {binary(), undefined | binary()}, %% ClientId and Username topic :: binary(), %% Topic that the message is published to qos = 0 :: 0 | 1 | 2, %% Message QoS flags = [] :: [retain | dup | sys], %% Message Flags @@ -135,13 +106,22 @@ %%-------------------------------------------------------------------- -record(mqtt_delivery, { sender :: pid(), %% Pid of the sender/publisher - from :: binary(), - message :: mqtt_message(), %% Message + message :: mqtt_message(), %% Message flows :: list() }). -type(mqtt_delivery() :: #mqtt_delivery{}). +%%-------------------------------------------------------------------- +%% MQTT Route +%%-------------------------------------------------------------------- +-record(mqtt_route, { + topic :: binary(), + node :: node() +}). + +-type(mqtt_route() :: #mqtt_route{}). + %%-------------------------------------------------------------------- %% MQTT Alarm %%-------------------------------------------------------------------- diff --git a/include/emqttd_protocol.hrl b/include/emqttd_protocol.hrl index 61b82f02d..8a5e5d0ca 100644 --- a/include/emqttd_protocol.hrl +++ b/include/emqttd_protocol.hrl @@ -26,7 +26,7 @@ {?MQTT_PROTO_V31, <<"MQIsdp">>}, {?MQTT_PROTO_V311, <<"MQTT">>}]). --type mqtt_vsn() :: ?MQTT_PROTO_V31 | ?MQTT_PROTO_V311. +-type(mqtt_vsn() :: ?MQTT_PROTO_V31 | ?MQTT_PROTO_V311). %%-------------------------------------------------------------------- %% MQTT QoS @@ -41,11 +41,11 @@ -define(IS_QOS(I), (I >= ?QOS0 andalso I =< ?QOS2)). --type mqtt_qos() :: ?QOS0 | ?QOS1 | ?QOS2. +-type(mqtt_qos() :: ?QOS0 | ?QOS1 | ?QOS2). --type mqtt_qos_name() :: qos0 | at_most_once | +-type(mqtt_qos_name() :: qos0 | at_most_once | qos1 | at_least_once | - qos2 | exactly_once. + qos2 | exactly_once). -define(QOS_I(Name), begin @@ -102,7 +102,7 @@ 'PINGRESP', 'DISCONNECT']). --type mqtt_packet_type() :: ?RESERVED..?DISCONNECT. +-type(mqtt_packet_type() :: ?RESERVED..?DISCONNECT). %%-------------------------------------------------------------------- %% MQTT Connect Return Codes @@ -114,7 +114,7 @@ -define(CONNACK_CREDENTIALS, 4). %% Username or password is malformed -define(CONNACK_AUTH, 5). %% Client is not authorized to connect --type mqtt_connack() :: ?CONNACK_ACCEPT..?CONNACK_AUTH. +-type(mqtt_connack() :: ?CONNACK_ACCEPT..?CONNACK_AUTH). %%-------------------------------------------------------------------- %% MQTT Parser and Serializer @@ -135,8 +135,9 @@ %%-------------------------------------------------------------------- %% MQTT Packets %%-------------------------------------------------------------------- --type mqtt_client_id() :: binary(). --type mqtt_packet_id() :: 1..16#ffff | undefined. +-type(mqtt_client_id() :: binary()). +-type(mqtt_username() :: binary() | undefined). +-type(mqtt_packet_id() :: 1..16#ffff | undefined). -record(mqtt_packet_connect, { client_id = <<>> :: mqtt_client_id(), diff --git a/src/emqttd_cli.erl b/src/emqttd_cli.erl index 6c77ddcd5..f346be653 100644 --- a/src/emqttd_cli.erl +++ b/src/emqttd_cli.erl @@ -491,13 +491,13 @@ print(Routes = [#mqtt_route{topic = Topic} | _]) -> Nodes = [atom_to_list(Node) || #mqtt_route{node = Node} <- Routes], ?PRINT("~s -> ~s~n", [Topic, string:join(Nodes, ",")]); -print(Subscriptions = [#mqtt_subscription{subid = ClientId} | _]) -> - TopicTable = [io_lib:format("~s:~w", [Topic, Qos]) - || #mqtt_subscription{topic = Topic, qos = Qos} <- Subscriptions], - ?PRINT("~s -> ~s~n", [ClientId, string:join(TopicTable, ",")]); +%% print(Subscriptions = [#mqtt_subscription{subid = ClientId} | _]) -> +%% TopicTable = [io_lib:format("~s:~w", [Topic, Qos]) +%% || #mqtt_subscription{topic = Topic, qos = Qos} <- Subscriptions], +%% ?PRINT("~s -> ~s~n", [ClientId, string:join(TopicTable, ",")]); -print(Topics = [#mqtt_topic{}|_]) -> - foreach(fun print/1, Topics); +%% print(Topics = [#mqtt_topic{}|_]) -> +%% foreach(fun print/1, Topics); print(#mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) -> ?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n", @@ -509,8 +509,8 @@ print(#mqtt_client{client_id = ClientId, clean_sess = CleanSess, username = User [ClientId, CleanSess, Username, emqttd_net:format(Peername), emqttd_time:now_to_secs(ConnectedAt)]); -print(#mqtt_topic{topic = Topic, flags = Flags}) -> - ?PRINT("~s: ~s~n", [Topic, string:join([atom_to_list(F) || F <- Flags], ",")]); +%% print(#mqtt_topic{topic = Topic, flags = Flags}) -> +%% ?PRINT("~s: ~s~n", [Topic, string:join([atom_to_list(F) || F <- Flags], ",")]); print(#mqtt_route{topic = Topic, node = Node}) -> ?PRINT("~s -> ~s~n", [Topic, Node]); diff --git a/src/emqttd_message.erl b/src/emqttd_message.erl index 7a3632d0e..c0b5ffb3b 100644 --- a/src/emqttd_message.erl +++ b/src/emqttd_message.erl @@ -81,8 +81,7 @@ from_packet(#mqtt_packet_connect{client_id = ClientId, will_msg = Msg}) -> #mqtt_message{msgid = msgid(Qos), topic = Topic, - from = ClientId, - sender = Username, + from = {ClientId, Username}, retain = Retain, qos = Qos, dup = false, @@ -95,7 +94,7 @@ from_packet(ClientId, Packet) -> from_packet(Username, ClientId, Packet) -> Msg = from_packet(Packet), - Msg#mqtt_message{from = ClientId, sender = Username}. + Msg#mqtt_message{from = {ClientId, Username}}. msgid(?QOS_0) -> undefined; @@ -150,10 +149,10 @@ unset_flag(retain, Msg = #mqtt_message{retain = true}) -> unset_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg. %% @doc Format MQTT Message -format(#mqtt_message{msgid = MsgId, pktid = PktId, from = From, sender = Sender, +format(#mqtt_message{msgid = MsgId, pktid = PktId, from = {ClientId, Username}, qos = Qos, retain = Retain, dup = Dup, topic =Topic}) -> - io_lib:format("Message(Q~p, R~p, D~p, MsgId=~p, PktId=~p, From=~s, Sender=~s, Topic=~s)", - [i(Qos), i(Retain), i(Dup), MsgId, PktId, From, Sender, Topic]). + io_lib:format("Message(Q~p, R~p, D~p, MsgId=~p, PktId=~p, From=~s/~s, Topic=~s)", + [i(Qos), i(Retain), i(Dup), MsgId, PktId, Username, ClientId, Topic]). i(true) -> 1; i(false) -> 0; diff --git a/src/emqttd_mod_presence.erl b/src/emqttd_mod_presence.erl index 89228df78..7815e88be 100644 --- a/src/emqttd_mod_presence.erl +++ b/src/emqttd_mod_presence.erl @@ -45,7 +45,7 @@ on_client_connected(ConnAck, Client = #mqtt_client{client_id = ClientId, emqttd:publish(emqttd_message:set_flag(sys, Msg)), {ok, Client}. -on_client_disconnected(Reason, ClientId, Opts) -> +on_client_disconnected(Reason, #mqtt_client{client_id = ClientId}, Opts) -> Json = mochijson2:encode([{clientid, ClientId}, {reason, reason(Reason)}, {ts, emqttd_time:now_to_secs()}]), diff --git a/src/emqttd_mod_rewrite.erl b/src/emqttd_mod_rewrite.erl index d23654bdc..edd6ac41a 100644 --- a/src/emqttd_mod_rewrite.erl +++ b/src/emqttd_mod_rewrite.erl @@ -40,13 +40,13 @@ load(Opts) -> emqttd:hook('message.publish', fun ?MODULE:rewrite_publish/2, [Sections]) end. -rewrite_subscribe(_ClientId, TopicTable, Sections) -> - lager:info("Rewrite subscribe: ~p", [TopicTable]), - {ok, [{match_topic(Topic, Sections), Qos} || {Topic, Qos} <- TopicTable]}. +rewrite_subscribe({_ClientId, _Username}, {Topic, Opts}, Sections) -> + lager:info("Rewrite subscribe: ~p", [{Topic, Opts}]), + {ok, {match_topic(Topic, Sections), Opts}}. -rewrite_unsubscribe(_ClientId, Topics, Sections) -> - lager:info("Rewrite unsubscribe: ~p", [Topics]), - {ok, [match_topic(Topic, Sections) || Topic <- Topics]}. +rewrite_unsubscribe({_ClientId, _Username}, {Topic, Opts}, Sections) -> + lager:info("Rewrite unsubscribe: ~p", [{Topic, Opts}]), + {ok, {match_topic(Topic, Sections), Opts}}. rewrite_publish(Message=#mqtt_message{topic = Topic}, Sections) -> %%TODO: this will not work if the client is always online. diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index ec6de539f..bfaee267f 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -146,7 +146,7 @@ process(Packet = ?CONNECT_PACKET(Var), State0) -> State2 = maybe_set_clientid(State1), %% Start session - case emqttd_sm:start_session(CleanSess, clientid(State2)) of + case emqttd_sm:start_session(CleanSess, {clientid(State2), Username}) of {ok, Session, SP} -> %% Register the client emqttd_cm:reg(client(State2)), @@ -247,9 +247,9 @@ with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId), end. -spec(send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}). -send(Msg, State = #proto_state{client_id = ClientId}) +send(Msg, State = #proto_state{client_id = ClientId, username = Username}) when is_record(Msg, mqtt_message) -> - emqttd:run_hooks('message.delivered', [ClientId], Msg), + emqttd:run_hooks('message.delivered', [{ClientId, Username}], Msg), send(emqttd_message:to_packet(Msg), State); send(Packet, State = #proto_state{sendfun = SendFun}) @@ -280,10 +280,11 @@ shutdown(conflict, #proto_state{client_id = _ClientId}) -> %% emqttd_cm:unreg(ClientId); ignore; -shutdown(Error, State = #proto_state{client_id = ClientId, will_msg = WillMsg}) -> +shutdown(Error, State = #proto_state{will_msg = WillMsg}) -> ?LOG(info, "Shutdown for ~p", [Error], State), - send_willmsg(ClientId, WillMsg), - emqttd:run_hooks('client.disconnected', [Error], ClientId), + Client = client(State), + send_willmsg(Client, WillMsg), + emqttd:run_hooks('client.disconnected', [Error], Client), %% let it down %% emqttd_cm:unreg(ClientId). ok. @@ -301,10 +302,10 @@ maybe_set_clientid(State = #proto_state{client_id = NullId}) maybe_set_clientid(State) -> State. -send_willmsg(_ClientId, undefined) -> +send_willmsg(_Client, undefined) -> ignore; -send_willmsg(ClientId, WillMsg) -> - emqttd:publish(WillMsg#mqtt_message{from = ClientId}). +send_willmsg(#mqtt_client{client_id = ClientId, username = Username}, WillMsg) -> + emqttd:publish(WillMsg#mqtt_message{from = {ClientId, Username}}). start_keepalive(0) -> ignore; diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 55eb08b11..0e2df6b2e 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -77,6 +77,9 @@ %% Old Client Pid that has been kickout old_client_pid :: pid(), + %% Username + username :: binary() | undefined, + %% Last packet id of the session packet_id = 1, @@ -136,9 +139,9 @@ "Session(~s): " ++ Format, [State#session.client_id | Args])). %% @doc Start a session. --spec(start_link(boolean(), mqtt_client_id(), pid()) -> {ok, pid()} | {error, any()}). -start_link(CleanSess, ClientId, ClientPid) -> - gen_server2:start_link(?MODULE, [CleanSess, ClientId, ClientPid], []). +-spec(start_link(boolean(), {mqtt_client_id(), mqtt_username()}, pid()) -> {ok, pid()} | {error, any()}). +start_link(CleanSess, {ClientId, Username}, ClientPid) -> + gen_server2:start_link(?MODULE, [CleanSess, {ClientId, Username}, ClientPid], []). %% @doc Resume a session. -spec(resume(pid(), mqtt_client_id(), pid()) -> ok). @@ -208,10 +211,10 @@ unsubscribe(SessPid, Topics) -> gen_server2:cast(SessPid, {unsubscribe, Topics}). %%-------------------------------------------------------------------- -%% gen_server callbacks +%% gen_server Callbacks %%-------------------------------------------------------------------- -init([CleanSess, ClientId, ClientPid]) -> +init([CleanSess, {ClientId, Username}, ClientPid]) -> process_flag(trap_exit, true), true = link(ClientPid), SessEnv = emqttd_conf:session(), @@ -219,6 +222,7 @@ init([CleanSess, ClientId, ClientPid]) -> clean_sess = CleanSess, client_id = ClientId, client_pid = ClientPid, + username = Username, subscriptions = dict:new(), inflight_queue = [], max_inflight = get_value(max_inflight, SessEnv, 0), @@ -232,7 +236,7 @@ init([CleanSess, ClientId, ClientPid]) -> expired_after = get_value(expired_after, SessEnv) * 60, collect_interval = get_value(collect_interval, SessEnv, 0), timestamp = os:timestamp()}, - emqttd_sm:register_session(ClientId, CleanSess, sess_info(Session)), + emqttd_sm:reg_session(ClientId, CleanSess, sess_info(Session)), %% Start statistics {ok, start_collector(Session), hibernate}. @@ -284,68 +288,67 @@ handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}}, handle_call(Req, _From, State) -> ?UNEXPECTED_REQ(Req, State). -handle_cast({subscribe, RawTopicTable, AckFun}, Session = #session{client_id = ClientId, - subscriptions = Subscriptions}) -> - %% TODO: Ugly... - TopicTable0 = lists:map(fun({T, Q}) -> - {T1, Opts} = emqttd_topic:strip(T), - {T1, [{qos, Q} | Opts]} - end, RawTopicTable), - case emqttd:run_hooks('client.subscribe', [ClientId], TopicTable0) of - {ok, TopicTable} -> - ?LOG(info, "Subscribe ~p", [TopicTable], Session), - Subscriptions1 = lists:foldl( - fun({Topic, Opts = [{qos, Qos}|_]}, SubDict) -> - case dict:find(Topic, SubDict) of - {ok, Qos} -> - ?LOG(warning, "duplicated subscribe: ~s, qos = ~w", [Topic, Qos], Session), - SubDict; - {ok, OldQos} -> - emqttd:setqos(Topic, ClientId, Qos), - ?LOG(warning, "duplicated subscribe ~s, old_qos=~w, new_qos=~w", [Topic, OldQos, Qos], Session), - dict:store(Topic, Qos, SubDict); - error -> - emqttd:subscribe(Topic, ClientId, Opts), - %%TODO: the design is ugly... - %% : 3.8.4 - %% Where the Topic Filter is not identical to any existing Subscription’s filter, - %% a new Subscription is created and all matching retained messages are sent. - emqttd_retainer:dispatch(Topic, self()), +%%TODO: 2.0 FIX - dict:store(Topic, Qos, SubDict) - end - end, Subscriptions, TopicTable), - AckFun([Qos || {_, Qos} <- RawTopicTable]), - emqttd:run_hooks('client.subscribe.after', [ClientId], TopicTable), - hibernate(Session#session{subscriptions = Subscriptions1}); - {stop, TopicTable} -> - ?LOG(error, "Cannot subscribe: ~p", [TopicTable], Session), - hibernate(Session) - end; +handle_cast({subscribe, TopicTable, AckFun}, Session = #session{client_id = ClientId, + username = Username, + subscriptions = Subscriptions}) -> + ?LOG(info, "Subscribe ~p", [TopicTable], Session), + {GrantedQos, Subscriptions1} = + lists:foldl(fun({RawTopic, Qos}, {QosAcc, SubDict}) -> + {Topic, Opts} = emqttd_topic:strip(RawTopic), + case emqttd:run_hooks('client.subscribe', [{ClientId, Username}], {Topic, Opts}) of + {ok, {Topic1, Opts1}} -> + NewQos = proplists:get_value(qos, Opts1, Qos), + {[NewQos | QosAcc], case dict:find(Topic, SubDict) of + {ok, NewQos} -> + ?LOG(warning, "duplicated subscribe: ~s, qos = ~w", [Topic, NewQos], Session), + SubDict; + {ok, OldQos} -> + emqttd:setqos(Topic, ClientId, NewQos), + ?LOG(warning, "duplicated subscribe ~s, old_qos=~w, new_qos=~w", [Topic, OldQos, NewQos], Session), + dict:store(Topic, NewQos, SubDict); + error -> + emqttd:subscribe(Topic1, ClientId, Opts1), + %%TODO: the design is ugly... + %% : 3.8.4 + %% Where the Topic Filter is not identical to any existing Subscription’s filter, + %% a new Subscription is created and all matching retained messages are sent. + emqttd_retainer:dispatch(Topic1, self()), + dict:store(Topic1, NewQos, SubDict) + end}; + {stop, _} -> + ?LOG(error, "Cannot subscribe: ~p", [Topic], Session), + {[128 | QosAcc], SubDict} + end + end, {[], Subscriptions}, TopicTable), + AckFun(lists:reverse(GrantedQos)), + %%emqttd:run_hooks('client.subscribe.after', [ClientId], TopicTable), + hibernate(Session#session{subscriptions = Subscriptions1}); -handle_cast({unsubscribe, RawTopics}, Session = #session{client_id = ClientId, - subscriptions = Subscriptions}) -> - Topics0 = lists:map(fun(Topic) -> - {T, _Opts} = emqttd_topic:strip(Topic), T - end, RawTopics), - case emqttd:run_hooks('client.unsubscribe', [ClientId], Topics0) of - {ok, Topics} -> - ?LOG(info, "unsubscribe ~p", [Topics], Session), - Subscriptions1 = lists:foldl( - fun(Topic, SubDict) -> - case dict:find(Topic, SubDict) of - {ok, _Qos} -> - emqttd:unsubscribe(Topic, ClientId), - dict:erase(Topic, SubDict); - error -> +%%TODO: 2.0 FIX + +handle_cast({unsubscribe, Topics}, Session = #session{client_id = ClientId, + username = Username, + subscriptions = Subscriptions}) -> + ?LOG(info, "unsubscribe ~p", [Topics], Session), + Subscriptions1 = + lists:foldl(fun(RawTopic, SubDict) -> + {Topic0, _Opts} = emqttd_topic:strip(RawTopic), + case emqttd:run_hooks('client.unsubscribe', [ClientId, Username], Topic0) of + {ok, Topic1} -> + case dict:find(Topic1, SubDict) of + {ok, _Qos} -> + emqttd:unsubscribe(Topic1, ClientId), + dict:erase(Topic1, SubDict); + error -> + SubDict + end; + {stop, _} -> SubDict end end, Subscriptions, Topics), - hibernate(Session#session{subscriptions = Subscriptions1}); - {stop, Topics} -> - ?LOG(info, "Cannot unsubscribe: ~p", [Topics], Session), - hibernate(Session) - end; + hibernate(Session#session{subscriptions = Subscriptions1}); handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId}) -> ?LOG(warning, "destroyed", [], Session), @@ -391,8 +394,7 @@ handle_cast({resume, ClientId, ClientPid}, Session = #session{client_id = C if CleanSess =:= true -> ?LOG(warning, "CleanSess changed to false.", [], Session), - %% emqttd_sm:unregister_session(CleanSess, ClientId), - emqttd_sm:register_session(ClientId, false, sess_info(Session1)); + emqttd_sm:reg_session(ClientId, false, sess_info(Session1)); CleanSess =:= false -> ok end, @@ -506,7 +508,7 @@ handle_info({timeout, awaiting_comp, PktId}, Session = #session{awaiting_comp = end; handle_info(collect_info, Session = #session{clean_sess = CleanSess, client_id = ClientId}) -> - emqttd_sm:register_session(ClientId, CleanSess, sess_info(Session)), + emqttd_sm:reg_session(ClientId, CleanSess, sess_info(Session)), hibernate(start_collector(Session)); handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true, @@ -539,7 +541,7 @@ handle_info(Info, Session) -> terminate(_Reason, #session{client_id = ClientId}) -> emqttd:subscriber_down(ClientId), - emqttd_sm:unregister_session(ClientId). + emqttd_sm:unreg_session(ClientId). code_change(_OldVsn, Session, _Extra) -> {ok, Session}. @@ -656,11 +658,12 @@ await(#mqtt_message{pktid = PktId}, Session = #session{awaiting_ack = Awaiting Session#session{awaiting_ack = Awaiting1}. acked(PktId, Session = #session{client_id = ClientId, + username = Username, inflight_queue = InflightQ, awaiting_ack = Awaiting}) -> case lists:keyfind(PktId, 1, InflightQ) of {_, Msg} -> - emqttd:run_hooks('message.acked', [ClientId], Msg); + emqttd:run_hooks('message.acked', [{ClientId, Username}], Msg); false -> ?LOG(error, "Cannot find acked pktid: ~p", [PktId], Session) end, diff --git a/src/emqttd_session_sup.erl b/src/emqttd_session_sup.erl index 2b9ee9496..394cb84d0 100644 --- a/src/emqttd_session_sup.erl +++ b/src/emqttd_session_sup.erl @@ -29,9 +29,9 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). %% @doc Start a session --spec(start_session(boolean(), binary(), pid()) -> {ok, pid()}). -start_session(CleanSess, ClientId, ClientPid) -> - supervisor:start_child(?MODULE, [CleanSess, ClientId, ClientPid]). +-spec(start_session(boolean(), {binary(), binary() | undefined} , pid()) -> {ok, pid()}). +start_session(CleanSess, {ClientId, Username}, ClientPid) -> + supervisor:start_child(?MODULE, [CleanSess, {ClientId, Username}, ClientPid]). %%-------------------------------------------------------------------- %% Supervisor callbacks diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index 43de1e91a..3a978b3fd 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -32,7 +32,7 @@ %% API Function Exports -export([start_link/2]). --export([start_session/2, lookup_session/1, register_session/3, unregister_session/1]). +-export([start_session/2, lookup_session/1, reg_session/3, unreg_session/1]). -export([dispatch/3]). @@ -77,10 +77,10 @@ start_link(Pool, Id) -> gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id], []). %% @doc Start a session --spec(start_session(boolean(), binary()) -> {ok, pid(), boolean()} | {error, any()}). -start_session(CleanSess, ClientId) -> +-spec(start_session(boolean(), {binary(), binary() | undefined}) -> {ok, pid(), boolean()} | {error, any()}). +start_session(CleanSess, {ClientId, Username}) -> SM = gproc_pool:pick_worker(?POOL, ClientId), - call(SM, {start_session, {CleanSess, ClientId, self()}}). + call(SM, {start_session, CleanSess, {ClientId, Username}, self()}). %% @doc Lookup a Session -spec(lookup_session(binary()) -> mqtt_session() | undefined). @@ -91,13 +91,13 @@ lookup_session(ClientId) -> end. %% @doc Register a session with info. --spec(register_session(binary(), boolean(), [tuple()]) -> true). -register_session(ClientId, CleanSess, Properties) -> +-spec(reg_session(binary(), boolean(), [tuple()]) -> true). +reg_session(ClientId, CleanSess, Properties) -> ets:insert(mqtt_local_session, {ClientId, self(), CleanSess, Properties}). %% @doc Unregister a session. --spec(unregister_session(binary()) -> true). -unregister_session(ClientId) -> +-spec(unreg_session(binary()) -> true). +unreg_session(ClientId) -> ets:delete(mqtt_local_session, ClientId). dispatch(ClientId, Topic, Msg) -> @@ -128,11 +128,11 @@ prioritise_info(_Msg, _Len, _State) -> 2. %% Persistent Session -handle_call({start_session, Client = {false, ClientId, ClientPid}}, _From, State) -> +handle_call({start_session, false, {ClientId, Username}, ClientPid}, _From, State) -> case lookup_session(ClientId) of undefined -> %% Create session locally - create_session(Client, State); + create_session({false, {ClientId, Username}, ClientPid}, State); Session -> case resume_session(Session, ClientPid) of {ok, SessPid} -> @@ -143,7 +143,8 @@ handle_call({start_session, Client = {false, ClientId, ClientPid}}, _From, State end; %% Transient Session -handle_call({start_session, Client = {true, ClientId, _ClientPid}}, _From, State) -> +handle_call({start_session, true, {ClientId, Username}, ClientPid}, _From, State) -> + Client = {true, {ClientId, Username}, ClientPid}, case lookup_session(ClientId) of undefined -> create_session(Client, State); @@ -195,8 +196,8 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- %% Create Session Locally -create_session({CleanSess, ClientId, ClientPid}, State) -> - case create_session(CleanSess, ClientId, ClientPid) of +create_session({CleanSess, {ClientId, Username}, ClientPid}, State) -> + case create_session(CleanSess, {ClientId, Username}, ClientPid) of {ok, SessPid} -> {reply, {ok, SessPid, false}, monitor_session(ClientId, SessPid, State)}; @@ -204,8 +205,8 @@ create_session({CleanSess, ClientId, ClientPid}, State) -> {reply, {error, Error}, State} end. -create_session(CleanSess, ClientId, ClientPid) -> - case emqttd_session_sup:start_session(CleanSess, ClientId, ClientPid) of +create_session(CleanSess, {ClientId, Username}, ClientPid) -> + case emqttd_session_sup:start_session(CleanSess, {ClientId, Username}, ClientPid) of {ok, SessPid} -> Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid, persistent = not CleanSess}, case insert_session(Session) of diff --git a/src/emqttd_topic.erl b/src/emqttd_topic.erl index 5ece2255b..ebd16714d 100644 --- a/src/emqttd_topic.erl +++ b/src/emqttd_topic.erl @@ -17,6 +17,7 @@ -module(emqttd_topic). -import(lists, [reverse/1]). + -export([match/2, validate/1, triples/1, words/1, wildcard/1]). -export([join/1, feed_var/3, systop/1]).