diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl index fa0a830e5..8f6cf6e97 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl @@ -20,7 +20,6 @@ -include_lib("emqx/include/types.hrl"). -include_lib("emqx/include/logger.hrl"). - %% API -export([ start_link/3 , stop/1 @@ -48,7 +47,6 @@ %% Internal callback -export([wakeup_from_hib/2, recvloop/2]). - -record(state, { %% TCP/SSL/UDP/DTLS Wrapped Socket socket :: {esockd_transport, esockd:socket()} | {udp, _, _}, diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl index e8a763332..2834c27f6 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl @@ -95,9 +95,9 @@ }). -define(DEFAULT_OVERRIDE, - #{ clientid => <<"">> %% Generate clientid by default - , username => <<"${Packet.headers.login}">> - , password => <<"${Packet.headers.passcode}">> + #{ clientid => <<"${ConnInfo.clientid}">> + %, username => <<"${ConnInfo.clientid}">> + %, password => <<"${Packet.headers.passcode}">> }). -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]). @@ -189,9 +189,10 @@ stats(#channel{session = Session})-> set_conn_state(ConnState, Channel) -> Channel#channel{conn_state = ConnState}. -enrich_conninfo(?SN_CONNECT_MSG(_Flags, _ProtoId, Duration, _ClientId), +enrich_conninfo(?SN_CONNECT_MSG(_Flags, _ProtoId, Duration, ClientId), Channel = #channel{conninfo = ConnInfo}) -> - NConnInfo = ConnInfo#{ proto_name => <<"MQTT-SN">> + NConnInfo = ConnInfo#{ clientid => ClientId + , proto_name => <<"MQTT-SN">> , proto_ver => <<"1.2">> , clean_start => true , keepalive => Duration @@ -592,9 +593,11 @@ handle_in(SubPkt = ?SN_SUBSCRIBE_MSG(_, MsgId, _), Channel) -> case emqx_misc:pipeline( [ fun preproc_subs_type/2 , fun check_subscribe_authz/2 + , fun run_client_subs_hook/2 , fun do_subscribe/2 ], SubPkt, Channel) of - {ok, {TopicId, GrantedQoS}, NChannel} -> + {ok, {TopicId, _TopicName, SubOpts}, NChannel} -> + GrantedQoS = maps:get(qos, SubOpts), SubAck = ?SN_SUBACK_MSG(#mqtt_sn_flags{qos = GrantedQoS}, TopicId, MsgId, ?SN_RC_ACCEPTED), {ok, outgoing_and_update(SubAck), NChannel}; @@ -610,6 +613,7 @@ handle_in(UnsubPkt = ?SN_UNSUBSCRIBE_MSG(_, MsgId, TopicIdOrName), Channel) -> case emqx_misc:pipeline( [ fun preproc_unsub_type/2 + , fun run_client_unsub_hook/2 , fun do_unsubscribe/2 ], UnsubPkt, Channel) of {ok, _TopicName, NChannel} -> @@ -841,13 +845,10 @@ check_subscribe_authz({_TopicId, TopicName, _QoS}, {error, ?SN_RC_NOT_AUTHORIZE} end. -do_subscribe({TopicId, TopicName, QoS}, - Channel = #channel{ - ctx = Ctx, - session = Session, - clientinfo = ClientInfo - = #{mountpoint := Mountpoint}}) -> - +run_client_subs_hook({TopicId, TopicName, QoS}, + Channel = #channel{ + ctx = Ctx, + clientinfo = ClientInfo}) -> {TopicName1, SubOpts0} = emqx_topic:parse(TopicName), TopicFilters = [{TopicName1, SubOpts0#{qos => QoS}}], case run_hooks(Ctx, 'client.subscribe', @@ -855,19 +856,26 @@ do_subscribe({TopicId, TopicName, QoS}, [] -> ?LOG(warning, "Skip to subscribe ~s, " "due to 'client.subscribe' denied!", [TopicName]), - {ok, Channel}; + {error, ?SN_EXCEED_LIMITATION}; [{NTopicName, NSubOpts}|_] -> - NTopicName1 = emqx_mountpoint:mount(Mountpoint, NTopicName), - NSubOpts1 = maps:merge(?DEFAULT_SUBOPTS, NSubOpts), - case emqx_session:subscribe(ClientInfo, NTopicName1, NSubOpts1, Session) of - {ok, NSession} -> - {ok, {TopicId, QoS}, - Channel#channel{session = NSession}}; - {error, ?RC_QUOTA_EXCEEDED} -> - ?LOG(warning, "Cannot subscribe ~s due to ~s.", - [TopicName, emqx_reason_codes:text(?RC_QUOTA_EXCEEDED)]), - {error, ?SN_EXCEED_LIMITATION} - end + {ok, {TopicId, NTopicName, NSubOpts}, Channel} + end. + +do_subscribe({TopicId, TopicName, SubOpts}, + Channel = #channel{ + session = Session, + clientinfo = ClientInfo + = #{mountpoint := Mountpoint}}) -> + NTopicName = emqx_mountpoint:mount(Mountpoint, TopicName), + NSubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts), + case emqx_session:subscribe(ClientInfo, NTopicName, NSubOpts, Session) of + {ok, NSession} -> + {ok, {TopicId, NTopicName, NSubOpts}, + Channel#channel{session = NSession}}; + {error, ?RC_QUOTA_EXCEEDED} -> + ?LOG(warning, "Cannot subscribe ~s due to ~s.", + [TopicName, emqx_reason_codes:text(?RC_QUOTA_EXCEEDED)]), + {error, ?SN_EXCEED_LIMITATION} end. %%-------------------------------------------------------------------- @@ -899,33 +907,42 @@ preproc_unsub_type(?SN_UNSUBSCRIBE_MSG_TYPE(?SN_SHORT_TOPIC, end, {ok, TopicName, Channel}. -do_unsubscribe(TopicName, - Channel = #channel{ - ctx = Ctx, - session = Session, - clientinfo = ClientInfo - = #{mountpoint := Mountpoint}}) -> +run_client_unsub_hook(TopicName, + Channel = #channel{ + ctx = Ctx, + clientinfo = ClientInfo + }) -> TopicFilters = [emqx_topic:parse(TopicName)], case run_hooks(Ctx, 'client.unsubscribe', [ClientInfo, #{}], TopicFilters) of [] -> - %% Skip to unsubscribe - {ok, Channel}; - [{NTopicName, NSubOpts}|_] -> - NTopicName1 = emqx_mountpoint:mount(Mountpoint, NTopicName), - NSubOpts1 = maps:merge( - emqx_gateway_utils:default_subopts(), - NSubOpts - ), - case emqx_session:unsubscribe(ClientInfo, NTopicName1, - NSubOpts1, Session) of - {ok, NSession} -> - {ok, Channel#channel{session = NSession}}; - {error, ?RC_NO_SUBSCRIPTION_EXISTED} -> - {ok, Channel} - end + {ok, [], Channel}; + NTopicFilters -> + {ok, NTopicFilters, Channel} end. +do_unsubscribe(TopicFilters, + Channel = #channel{ + session = Session, + clientinfo = ClientInfo + = #{mountpoint := Mountpoint}}) -> + NChannel = + lists:foldl(fun({TopicName, SubOpts}, ChannAcc) -> + NTopicName = emqx_mountpoint:mount(Mountpoint, TopicName), + NSubOpts = maps:merge( + emqx_gateway_utils:default_subopts(), + SubOpts + ), + case emqx_session:unsubscribe(ClientInfo, NTopicName, + NSubOpts, Session) of + {ok, NSession} -> + ChannAcc#channel{session = NSession}; + {error, ?RC_NO_SUBSCRIPTION_EXISTED} -> + ChannAcc + end + end, Channel, TopicFilters), + {ok, TopicFilters, NChannel}. + %%-------------------------------------------------------------------- %% Awake & Asleep @@ -1101,14 +1118,36 @@ message_to_packet(MsgId, Message, | {shutdown, Reason :: term(), Reply :: term(), channel()} | {shutdown, Reason :: term(), Reply :: term(), emqx_types:packet(), channel()}. -handle_call({subscribe, _Topic, _Subopts}, Channel) -> - reply({error, not_supported_now}, Channel); +handle_call({subscribe, Topic, SubOpts}, Channel) -> + %% XXX: Only support short_topic_name + SubProps = maps:get(sub_props, SubOpts, #{}), + case maps:get(subtype, SubProps, short_topic_name) of + short_topic_name -> + case byte_size(Topic) of + 2 -> + case do_subscribe({?SN_INVALID_TOPIC_ID, + Topic, SubOpts}, Channel) of + {ok, _, NChannel} -> + reply(ok, NChannel); + {error, ?SN_EXCEED_LIMITATION} -> + reply({error, exceed_limitation}, Channel) + end; + _ -> + reply({error, bad_topic_name}, Channel) + end; + predefined_topic_id -> + reply({error, only_support_short_name_topic}, Channel); + _ -> + reply({error, only_support_short_name_topic}, Channel) + end; -handle_call({unsubscribe, _Topic}, Channel) -> - reply({error, not_supported_now}, Channel); +handle_call({unsubscribe, Topic}, Channel) -> + TopicFilters = [emqx_topic:parse(Topic)], + {ok, _, NChannel} = do_unsubscribe(TopicFilters, Channel), + reply(ok, NChannel); -handle_call(subscriptions, Channel) -> - reply({error, not_supported_now}, Channel); +handle_call(subscriptions, Channel = #channel{session = Session}) -> + reply(maps:to_list(emqx_session:info(subscriptions, Session)), Channel); handle_call(kick, Channel) -> NChannel = ensure_disconnected(kicked, Channel), diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl index 1e0c5e2d4..9bd2dac1b 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl @@ -22,7 +22,6 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). - -import(proplists, [get_value/2, get_value/3]). %% API @@ -548,9 +547,9 @@ check_subscribed_status({SubId, {ParsedTopic, _SubOpts}}, }) -> MountedTopic = emqx_mountpoint:mount(Mountpoint, ParsedTopic), case lists:keyfind(SubId, 1, Subs) of - {SubId, MountedTopic, _Ack, _SubOpts} -> + {SubId, MountedTopic, _Ack, _} -> ok; - {SubId, _OtherTopic, _Ack, _SubOpts} -> + {SubId, _OtherTopic, _Ack, _} -> {error, "Conflict subscribe id"}; false -> ok