diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index e34ac3a70..4c88059e6 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -290,6 +290,13 @@ connected(info, Deliver = {deliver, _Topic, _Msg}, shutdown(Reason, State#state{proto_state = NProtoState}) end; +%% TODO: Improve later. +connected(info, {subscribe, TopicFilters}, State) -> + handle_request({subscribe, TopicFilters}, State); + +connected(info, {unsubscribe, TopicFilters}, State) -> + handle_request({unsubscribe, TopicFilters}, State); + %% Keepalive timer connected(info, {keepalive, check}, State = #state{keepalive = KeepAlive}) -> case emqx_keepalive:check(KeepAlive) of @@ -451,6 +458,17 @@ terminate(Reason, _StateName, #state{transport = Transport, ok = emqx_keepalive:cancel(KeepAlive), emqx_protocol:terminate(Reason, ProtoState). +%%-------------------------------------------------------------------- +%% Handle internal request + +handle_request(Req, State = #state{proto_state = ProtoState}) -> + case emqx_protocol:handle_req(Req, ProtoState) of + {ok, _Result, NProtoState} -> %% TODO:: how to handle the result? + keep_state(State#state{proto_state = NProtoState}); + {error, Reason, NProtoState} -> + shutdown(Reason, State#state{proto_state = NProtoState}) + end. + %%-------------------------------------------------------------------- %% Process incoming data diff --git a/src/emqx_mod_presence.erl b/src/emqx_mod_presence.erl index 9c96bd774..97f7a9929 100644 --- a/src/emqx_mod_presence.erl +++ b/src/emqx_mod_presence.erl @@ -33,39 +33,50 @@ , unload/1 ]). --define(ATTR_KEYS, [clean_start, proto_ver, proto_name, keepalive]). - %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- load(Env) -> - emqx_hooks:add('client.connected', fun ?MODULE:on_client_connected/4, [Env]), - emqx_hooks:add('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]). + emqx_hooks:add('client.connected', {?MODULE, on_client_connected, [Env]}), + emqx_hooks:add('client.disconnected', {?MODULE, on_client_disconnected, [Env]}). on_client_connected(#{client_id := ClientId, username := Username, - peername := {IpAddr, _}}, ConnAck, ConnAttrs, Env) -> - Attrs = #{},%maps:filter(fun(K, _) -> - % lists:member(K, ?ATTR_KEYS) - % end, ConnAttrs), - case emqx_json:safe_encode(Attrs#{clientid => ClientId, - username => Username, - ipaddress => iolist_to_binary(esockd_net:ntoa(IpAddr)), - connack => ConnAck, - ts => erlang:system_time(millisecond) - }) of + peername := {IpAddr, _} + }, ConnAck, + #{session := #{clean_start := CleanStart, + expiry_interval := Interval + }, + proto_name := ProtoName, + proto_ver := ProtoVer, + keepalive := Keepalive + }, Env) -> + + case emqx_json:safe_encode(#{clientid => ClientId, + username => Username, + ipaddress => iolist_to_binary(esockd_net:ntoa(IpAddr)), + proto_name => ProtoName, + proto_ver => ProtoVer, + keepalive => Keepalive, + clean_start => CleanStart, + expiry_interval => Interval, + connack => ConnAck, + ts => erlang:system_time(millisecond) + }) of {ok, Payload} -> emqx:publish(message(qos(Env), topic(connected, ClientId), Payload)); {error, Reason} -> ?LOG(error, "Encoding connected event error: ~p", [Reason]) end. -on_client_disconnected(#{client_id := ClientId, username := Username}, Reason, Env) -> - case emqx_json:safe_encode([{clientid, ClientId}, - {username, Username}, - {reason, reason(Reason)}, - {ts, erlang:system_time(millisecond)}]) of +on_client_disconnected(#{client_id := ClientId, + username := Username}, Reason, Env) -> + case emqx_json:safe_encode(#{clientid => ClientId, + username => Username, + reason => reason(Reason), + ts => erlang:system_time(millisecond) + }) of {ok, Payload} -> emqx_broker:publish(message(qos(Env), topic(disconnected, ClientId), Payload)); {error, Reason} -> @@ -73,8 +84,8 @@ on_client_disconnected(#{client_id := ClientId, username := Username}, Reason, E end. unload(_Env) -> - emqx_hooks:del('client.connected', fun ?MODULE:on_client_connected/4), - emqx_hooks:del('client.disconnected', fun ?MODULE:on_client_disconnected/3). + emqx_hooks:del('client.connected', {?MODULE, on_client_connected}), + emqx_hooks:del('client.disconnected', {?MODULE, on_client_disconnected}). message(QoS, Topic, Payload) -> emqx_message:set_flag( @@ -91,3 +102,4 @@ qos(Env) -> proplists:get_value(qos, Env, 0). reason(Reason) when is_atom(Reason) -> Reason; reason({Error, _}) when is_atom(Error) -> Error; reason(_) -> internal_error. + diff --git a/src/emqx_mod_subscription.erl b/src/emqx_mod_subscription.erl index cc4d5cda7..b00bef4f4 100644 --- a/src/emqx_mod_subscription.erl +++ b/src/emqx_mod_subscription.erl @@ -22,28 +22,32 @@ -include_lib("emqx_mqtt.hrl"). %% APIs --export([on_session_created/3]). +-export([on_client_connected/4]). %% emqx_gen_mod callbacks -export([ load/1 , unload/1 ]). -%%------------------------------------------------------------------------------ + +%%-------------------------------------------------------------------- %% Load/Unload Hook -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- load(Topics) -> - emqx_hooks:add('session.created', fun ?MODULE:on_session_created/3, [Topics]). + emqx_hooks:add('client.connected', {?MODULE, on_client_connected, [Topics]}). -on_session_created(#{client_id := ClientId}, SessAttrs, Topics) -> - Username = proplists:get_value(username, SessAttrs), +on_client_connected(#{client_id := ClientId, + username := Username, + conn_mod := ConnMod + }, ?RC_SUCCESS, _ConnAttrs, Topics) -> Replace = fun(Topic) -> rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic)) end, - emqx_session:subscribe(self(), [{Replace(Topic), #{qos => QoS}} || {Topic, QoS} <- Topics]). + TopicFilters = [{Replace(Topic), #{qos => QoS}} || {Topic, QoS} <- Topics], + self() ! {subscribe, TopicFilters}. unload(_) -> - emqx_hooks:del('session.created', fun ?MODULE:on_session_created/3). + emqx_hooks:del('client.connected', {?MODULE, on_client_connected}). %%------------------------------------------------------------------------------ %% Internal functions diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index e5298656d..218cf0fdc 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -32,6 +32,7 @@ -export([ init/2 , handle_in/2 + , handle_req/2 , handle_deliver/2 , handle_out/2 , handle_timeout/3 @@ -264,6 +265,31 @@ handle_in(Packet, PState) -> io:format("In: ~p~n", [Packet]), {ok, PState}. +%%-------------------------------------------------------------------- +%% Handle internal request +%%-------------------------------------------------------------------- + +-spec(handle_req(Req:: term(), proto_state()) + -> {ok, Result :: term(), proto_state()} | + {error, Reason :: term(), proto_state()}). +handle_req({subscribe, TopicFilters}, PState = #protocol{client = Client}) -> + TopicFilters1 = emqx_hooks:run_fold('client.subscribe', + [Client, #{'Internal' => true}], + parse(subscribe, TopicFilters)), + {ReasonCodes, NPState} = process_subscribe(TopicFilters1, PState), + {ok, ReasonCodes, NPState}; + +handle_req({unsubscribe, TopicFilters}, PState = #protocol{client = Client}) -> + TopicFilters1 = emqx_hooks:run_fold('client.unsubscribe', + [Client, #{'Internal' => true}], + parse(unsubscribe, TopicFilters)), + {ReasonCodes, NPState} = process_unsubscribe(TopicFilters1, PState), + {ok, ReasonCodes, NPState}; + +handle_req(Req, PState) -> + ?LOG(error, "Unexpected request: ~p~n", [Req]), + {ok, ignored, PState}. + %%-------------------------------------------------------------------- %% Handle delivers %%-------------------------------------------------------------------- @@ -306,14 +332,14 @@ handle_out({connack, ?RC_SUCCESS, SP}, %% subscribe requests or responses that are not intended for them. AckProps1 = if AckProps == undefined -> #{}; true -> AckProps end, AckProps2 = AckProps1#{'Retain-Available' => flag(Retain), - 'Maximum-Packet-Size' => MaxPktSize, - 'Topic-Alias-Maximum' => MaxAlias, - 'Wildcard-Subscription-Available' => flag(Wildcard), - 'Subscription-Identifier-Available' => 1, - %'Response-Information' => - 'Shared-Subscription-Available' => flag(Shared), - 'Maximum-QoS' => MaxQoS - }, + 'Maximum-Packet-Size' => MaxPktSize, + 'Topic-Alias-Maximum' => MaxAlias, + 'Wildcard-Subscription-Available' => flag(Wildcard), + 'Subscription-Identifier-Available' => 1, + %'Response-Information' => + 'Shared-Subscription-Available' => flag(Shared), + 'Maximum-QoS' => MaxQoS + }, AckProps3 = case emqx_zone:get_env(Zone, server_keepalive) of undefined -> AckProps2; Keepalive -> AckProps2#{'Server-Keep-Alive' => Keepalive} @@ -334,7 +360,7 @@ handle_out({connack, ReasonCode}, PState = #protocol{client = Client, Reason = emqx_reason_codes:name(ReasonCode1, ProtoVer), {error, Reason, ?CONNACK_PACKET(ReasonCode1), PState}; -handle_out({publish, Publishes}, PState = #protocol{client = Client}) -> +handle_out({publish, Publishes}, PState) -> Packets = [element(2, handle_out(Publish, PState)) || Publish <- Publishes], {ok, Packets, PState}; @@ -808,6 +834,26 @@ do_unsubscribe(TopicFilter, _SubOpts, PState = #protocol{client = Client, is_acl_enabled(#{zone := Zone, is_superuser := IsSuperuser}) -> (not IsSuperuser) andalso emqx_zone:get_env(Zone, enable_acl, true). +%%-------------------------------------------------------------------- +%% Parse topic filters +%%-------------------------------------------------------------------- + +parse(subscribe, TopicFilters) -> + [emqx_topic:parse(TopicFilter, SubOpts) || {TopicFilter, SubOpts} <- TopicFilters]; + +parse(unsubscribe, TopicFilters) -> + lists:map(fun emqx_topic:parse/1, TopicFilters). + +%%-------------------------------------------------------------------- +%% Mount/Unmount +%%-------------------------------------------------------------------- + +mount(#{mountpoint := MountPoint}, TopicOrMsg) -> + emqx_mountpoint:mount(MountPoint, TopicOrMsg). + +unmount(#{mountpoint := MountPoint}, TopicOrMsg) -> + emqx_mountpoint:unmount(MountPoint, TopicOrMsg). + %%-------------------------------------------------------------------- %% Pipeline %%-------------------------------------------------------------------- @@ -828,16 +874,6 @@ pipeline([Fun|More], Packet, PState) -> {error, ReasonCode, NPState} end. -%%-------------------------------------------------------------------- -%% Mount/Unmount -%%-------------------------------------------------------------------- - -mount(#{mountpoint := MountPoint}, TopicOrMsg) -> - emqx_mountpoint:mount(MountPoint, TopicOrMsg). - -unmount(#{mountpoint := MountPoint}, TopicOrMsg) -> - emqx_mountpoint:unmount(MountPoint, TopicOrMsg). - %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- diff --git a/src/emqx_ws_channel.erl b/src/emqx_ws_channel.erl index 09a6fb334..11349edbc 100644 --- a/src/emqx_ws_channel.erl +++ b/src/emqx_ws_channel.erl @@ -322,6 +322,12 @@ websocket_info({timeout, Timer, Msg}, stop(Reason, State#state{proto_state = NProtoState}) end; +websocket_info({subscribe, TopicFilters}, State) -> + handle_request({subscribe, TopicFilters}, State); + +websocket_info({unsubscribe, TopicFilters}, State) -> + handle_request({unsubscribe, TopicFilters}, State); + websocket_info({shutdown, discard, {ClientId, ByPid}}, State) -> ?LOG(warning, "Discarded by ~s:~p", [ClientId, ByPid]), stop(discard, State); @@ -381,6 +387,17 @@ ensure_keepalive(Interval, #state{proto_state = ProtoState}) -> keepalive_backoff, 0.75), emqx_keepalive:start(stat_fun(), round(Interval * Backoff), {keepalive, check}). +%%-------------------------------------------------------------------- +%% Handle internal request + +handle_request(Req, State = #state{proto_state = ProtoState}) -> + case emqx_protocol:handle_req(Req, ProtoState) of + {ok, _Result, NProtoState} -> %% TODO:: how to handle the result? + {ok, State#state{proto_state = NProtoState}}; + {error, Reason, NProtoState} -> + stop(Reason, State#state{proto_state = NProtoState}) + end. + %%-------------------------------------------------------------------- %% Process incoming data