From 22e8b07a3d41602180774178a3d0b06ded7d16ec Mon Sep 17 00:00:00 2001 From: turtled Date: Sun, 19 Aug 2018 20:31:44 +0800 Subject: [PATCH 1/3] Receive/send messages by bridge --- etc/emqx.conf | 241 ++++++++++++++++++++++++++++++------- priv/emqx.schema | 27 ++++- src/emqx_bridge1.erl | 254 +++++++++++++++++++++++++++++++++++++++ src/emqx_bridge1_sup.erl | 45 +++++++ src/emqx_broker.erl | 4 +- src/emqx_message.erl | 4 +- src/emqx_sup.erl | 2 + src/emqx_time.erl | 4 +- 8 files changed, 529 insertions(+), 52 deletions(-) create mode 100644 src/emqx_bridge1.erl create mode 100644 src/emqx_bridge1_sup.erl diff --git a/etc/emqx.conf b/etc/emqx.conf index 83ddcf6a8..6b31c63c3 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1497,16 +1497,20 @@ zone.internal.mqueue_store_qos0 = true ## Bridges ##-------------------------------------------------------------------- -## Bridge Type. +##-------------------------------------------------------------------- +## Bridges to edge +##-------------------------------------------------------------------- +## Bridge type. ## -## Value: local | remote -bridge.name.type = local +## Value: Enum +## Example: out | in +bridge.edge.type = in ## Bridge address: node name for local bridge, host:port for remote. ## ## Value: String ## Example: emqx@127.0.0.1, 127.0.0.1:1883 -bridge.name.address = emqx@127.0.0.1 +bridge.edge.address = 127.0.0.1:1883 ## Protocol version of the bridge. ## @@ -1514,76 +1518,221 @@ bridge.name.address = emqx@127.0.0.1 ## - mqtt5 ## - mqtt4 ## - mqtt3 -bridge.name.proto_ver = mqtt4 +bridge.edge.proto_ver = mqtt4 ## The ClientId of a remote bridge. ## ## Value: String -bridge.name.client_id = bridge:$name +bridge.edge.client_id = bridge_edge ## The Clean start flag of a remote bridge. ## ## Value: boolean -bridge.name.clean_start = false +bridge.edge.clean_start = false ## The username for a remote bridge. ## ## Value: String -bridge.name.username = user +bridge.edge.username = user ## The password for a remote bridge. ## ## Value: String -bridge.name.password = passwd +bridge.edge.password = passwd ## Mountpoint of the bridge. ## ## Value: String -bridge.name.mountpoint = bridge/$name/ - -## PEM-encoded CA certificates of the bridge. -## -## Value: File -bridge.name.cacertfile = cacert.pem - -## SSL Certfile of the bridge. -## -## Value: File -bridge.name.certfile = cert.pem - -## SSL Keyfile of the bridge. -## -## Value: File -bridge.name.keyfile = key.pem - -## SSL Ciphers used by the bridge. -## -## Value: String -bridge.name.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384 - -## TLS versions used by the bridge. -## -## Value: String -bridge.name.tls_versions = tlsv1.2,tlsv1.1,tlsv1 - -## The pending message queue of a bridge. -## -## Value: Number -bridge.name.max_pending_messages = 10000 +## bridge.edge.mountpoint = bridge/edge/ ## Ping interval of a down bridge. ## ## Value: Duration ## Default: 10 seconds -bridge.name.keepalive = 10s +bridge.edge.keepalive = 10s -## Subscriptions of the bridge. +## Subscriptions of the bridge topic. ## +## Value: String +bridge.edge.subscription.1.topic = # + +## Subscriptions of the bridge qos. +## +## Value: Number +bridge.edge.subscription.1.qos = 1 + +## The pending message queue of a bridge. +## +## Value: Number +bridge.edge.max_pending_messages = 10000 + +## Start type of the bridge. +## +## Value: enum +## manual +## auto +bridge.edge.start_type = manual + +## Bridge reconnect count. +## +## Value: Number +bridge.edge.reconnect_count = 10 + +## Bridge reconnect time. +## +## Value: Duration +## Default: 30 seconds +bridge.edge.reconnect_time = 30s + +## PEM-encoded CA certificates of the bridge. +## +## Value: File +## bridge.edge.cacertfile = cacert.pem + +## SSL Certfile of the bridge. +## +## Value: File +## bridge.edge.certfile = cert.pem + +## SSL Keyfile of the bridge. +## +## Value: File +## bridge.edge.keyfile = key.pem + +## SSL Ciphers used by the bridge. +## +## Value: String +## bridge.edge.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384 + +## TLS versions used by the bridge. +## +## Value: String +## bridge.edge.tls_versions = tlsv1.2,tlsv1.1,tlsv1 + + +##-------------------------------------------------------------------- +## Bridges to cloud +##-------------------------------------------------------------------- +## Bridge type. +## +## Value: Enum +## Example: out | in +bridge.cloud.type = out + +## Bridge address: node name for local bridge, host:port for remote. +## +## Value: String +## Example: emqx@127.0.0.1, 127.0.0.1:1883 +bridge.cloud.address = 127.0.0.1:1883 + +## Protocol version of the bridge. +## +## Value: Enum +## - mqtt5 +## - mqtt4 +## - mqtt3 +bridge.cloud.proto_ver = mqtt4 + +## The ClientId of a remote bridge. +## +## Value: String +bridge.cloud.client_id = bridge_cloud + +## The Clean start flag of a remote bridge. +## +## Value: boolean +bridge.cloud.clean_start = false + +## The username for a remote bridge. +## +## Value: String +bridge.cloud.username = user + +## The password for a remote bridge. +## +## Value: String +bridge.cloud.password = passwd + +## Mountpoint of the bridge. +## +## Value: String +bridge.cloud.mountpoint = bridge/edge/${node}/ + +## Ping interval of a down bridge. +## +## Value: Duration ## Default: 10 seconds -bridge.name.subscription.1.topic = topic1/ -bridge.name.subscription.1.qos = 2 -## bridge.name.subscription.2.topic = topic2/ -## bridge.name.subscription.2.qos = 2 +bridge.cloud.keepalive = 10s + +## Forward message topics +## +## Value: String +## Example: topic1/#,topic2/# +bridge.cloud.forward_rule = # + +## Subscriptions of the bridge topic. +## +## Value: String +bridge.cloud.subscription.1.topic = $share/cmd/topic1 + +## Subscriptions of the bridge qos. +## +## Value: Number +bridge.cloud.subscription.1.qos = 1 + +## Bridge store message type. +## +## Value: Enum +## Example: memory | disk +bridge.cloud.store_type = memory + +## The pending message queue of a bridge. +## +## Value: Number +bridge.cloud.max_pending_messages = 10000 + +## Start type of the bridge. +## +## Value: enum +## manual +## auto +bridge.cloud.start_type = manual + +## Bridge reconnect count. +## +## Value: Number +bridge.cloud.reconnect_count = 10 + +## Bridge reconnect time. +## +## Value: Duration +## Default: 30 seconds +bridge.cloud.reconnect_time = 30s + +## PEM-encoded CA certificates of the bridge. +## +## Value: File +## bridge.cloud.cacertfile = cacert.pem + +## SSL Certfile of the bridge. +## +## Value: File +## bridge.cloud.certfile = cert.pem + +## SSL Keyfile of the bridge. +## +## Value: File +## bridge.cloud.keyfile = key.pem + +## SSL Ciphers used by the bridge. +## +## Value: String +## bridge.cloud.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384 + +## TLS versions used by the bridge. +## +## Value: String +## bridge.cloud.tls_versions = tlsv1.2,tlsv1.1,tlsv1 ##-------------------------------------------------------------------- ## Modules diff --git a/priv/emqx.schema b/priv/emqx.schema index c502a8d24..1626b645f 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1395,8 +1395,11 @@ end}. %%-------------------------------------------------------------------- {mapping, "bridge.$name.type", "emqx.bridges", [ - {default, local}, - {datatype, {enum, [local,remote]}} + {datatype, {enum, [in, out]}} +]}. + +{mapping, "bridge.$name.store_type", "emqx.bridges", [ + {datatype, {enum, [memory, disk]}} ]}. {mapping, "bridge.$name.address", "emqx.bridges", [ @@ -1428,6 +1431,10 @@ end}. {datatype, string} ]}. +{mapping, "bridge.$name.forward_rule", "emqx.bridges", [ + {datatype, string} +]}. + {mapping, "bridge.$name.cacertfile", "emqx.bridges", [ {datatype, string} ]}. @@ -1466,6 +1473,22 @@ end}. {datatype, integer} ]}. +{mapping, "bridge.$name.start_type", "emqx.bridges", [ + {datatype, {enum, [manual, auto]}}, + {default, auto} +]}. + +{mapping, "bridge.$name.reconnect_count", "emqx.bridges", [ + {default, 10}, + {datatype, integer} +]}. + +{mapping, "bridge.$name.reconnect_time", "emqx.bridges", [ + {default, "30s"}, + {datatype, {duration, s}} +]}. + + {translation, "emqx.bridges", fun(Conf) -> Split = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end, diff --git a/src/emqx_bridge1.erl b/src/emqx_bridge1.erl new file mode 100644 index 000000000..139711932 --- /dev/null +++ b/src/emqx_bridge1.erl @@ -0,0 +1,254 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_bridge1). + +-behaviour(gen_server). + +-include("emqx.hrl"). +-include("emqx_mqtt.hrl"). + + -import(proplists, [get_value/2, get_value/3]). + +-export([start_link/2, start_bridge/1, stop_bridge/1, status/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-record(state, {client_pid, options, reconnect_time, reconnect_count, + def_reconnect_count, type, mountpoint, queue, store_type, + max_pending_messages}). + +-record(mqtt_msg, {qos = ?QOS0, retain = false, dup = false, + packet_id, topic, props, payload}). + +start_link(Name, Options) -> + gen_server:start_link({local, name(Name)}, ?MODULE, [Options], []). + +start_bridge(Name) -> + gen_server:call(name(Name), start_bridge). + +stop_bridge(Name) -> + gen_server:call(name(Name), stop_bridge). + +status(Pid) -> + gen_server:call(Pid, status). + +%%------------------------------------------------------------------------------ +%% gen_server callbacks +%%------------------------------------------------------------------------------ + +init([Options]) -> + process_flag(trap_exit, true), + case get_value(start_type, Options, manual) of + manual -> ok; + auto -> erlang:send_after(1000, self(), start) + end, + ReconnectCount = get_value(reconnect_count, Options, 10), + ReconnectTime = get_value(reconnect_time, Options, 30000), + MaxPendingMsg = get_value(max_pending_messages, Options, 10000), + Mountpoint = format_mountpoint(get_value(mountpoint, Options)), + StoreType = get_value(store_type, Options, memory), + Type = get_value(type, Options, in), + Queue = [], + {ok, #state{type = Type, + mountpoint = Mountpoint, + queue = Queue, + store_type = StoreType, + options = Options, + reconnect_count = ReconnectCount, + reconnect_time = ReconnectTime, + def_reconnect_count = ReconnectCount, + max_pending_messages = MaxPendingMsg}}. + +handle_call(start_bridge, _From, State = #state{client_pid = undefined}) -> + {noreply, NewState} = handle_info(start, State), + {reply, <<"start bridge successfully">>, NewState}; + +handle_call(start_bridge, _From, State) -> + {reply, <<"bridge already started">>, State}; + +handle_call(stop_bridge, _From, State = #state{client_pid = undefined}) -> + {reply, <<"bridge not started">>, State}; + +handle_call(stop_bridge, _From, State = #state{client_pid = Pid}) -> + emqx_client:disconnect(Pid), + {reply, <<"stop bridge successfully">>, State}; + +handle_call(status, _From, State = #state{client_pid = undefined}) -> + {reply, <<"Stopped">>, State}; +handle_call(status, _From, State = #state{client_pid = _Pid})-> + {reply, <<"Running">>, State}; + +handle_call(Req, _From, State) -> + emqx_logger:error("[Bridge] unexpected call: ~p", [Req]), + {reply, ignored, State}. + +handle_cast(Msg, State) -> + emqx_logger:error("[Bridge] unexpected cast: ~p", [Msg]), + {noreply, State}. + +handle_info(start, State = #state{reconnect_count = 0}) -> + {noreply, State}; + +%%---------------------------------------------------------------- +%% start in message bridge +%%---------------------------------------------------------------- +handle_info(start, State = #state{options = Options, + client_pid = undefined, + reconnect_time = ReconnectTime, + reconnect_count = ReconnectCount, + type = in}) -> + case emqx_client:start_link([{owner, self()}|options(Options)]) of + {ok, ClientPid, _} -> + Subs = get_value(subscriptions, Options, []), + [emqx_client:subscribe(ClientPid, {i2b(Topic), Qos}) || {Topic, Qos} <- Subs], + {noreply, State#state{client_pid = ClientPid}}; + {error,_} -> + erlang:send_after(ReconnectTime, self(), start), + {noreply, State = #state{reconnect_count = ReconnectCount-1}} + end; + +%%---------------------------------------------------------------- +%% start out message bridge +%%---------------------------------------------------------------- +handle_info(start, State = #state{options = Options, + client_pid = undefined, + reconnect_time = ReconnectTime, + reconnect_count = ReconnectCount, + type = out}) -> + case emqx_client:start_link([{owner, self()}|options(Options)]) of + {ok, ClientPid, _} -> + Subs = get_value(subscriptions, Options, []), + [emqx_client:subscribe(ClientPid, {i2b(Topic), Qos}) || {Topic, Qos} <- Subs], + ForwardRules = string:tokens(get_value(forward_rule, Options, ""), ","), + [emqx_broker:subscribe(i2b(Topic)) || Topic <- ForwardRules, emqx_topic:validate({filter, i2b(Topic)})], + {noreply, State#state{client_pid = ClientPid}}; + {error,_} -> + erlang:send_after(ReconnectTime, self(), start), + {noreply, State = #state{reconnect_count = ReconnectCount-1}} + end; + +%%---------------------------------------------------------------- +%% received local node message +%%---------------------------------------------------------------- +handle_info({dispatch, _, #message{topic = Topic, payload = Payload, flags = #{retain := Retain}}}, + State = #state{client_pid = Pid, mountpoint = Mountpoint, queue = Queue, + store_type = StoreType, max_pending_messages = MaxPendingMsg}) -> + Msg = #mqtt_msg{qos = 1, + retain = Retain, + topic = mountpoint(Mountpoint, Topic), + payload = Payload}, + case emqx_client:publish(Pid, Msg) of + {ok, PkgId} -> + {noreply, State#state{queue = store(StoreType, {PkgId, Msg}, Queue, MaxPendingMsg)}}; + {error, Reason} -> + emqx_logger:error("Publish fail:~p", [Reason]), + {noreply, State} + end; + +%%---------------------------------------------------------------- +%% received remote node message +%%---------------------------------------------------------------- +handle_info({publish, #{qos := QoS, dup := Dup, retain := Retain, topic := Topic, + properties := Props, payload := Payload}}, State) -> + NewMsg0 = emqx_message:make(bridge, QoS, Topic, Payload), + NewMsg1 = emqx_message:set_headers(Props, emqx_message:set_flags(#{dup => Dup, retain=> Retain}, NewMsg0)), + emqx_broker:publish(NewMsg1), + {noreply, State}; + +%%---------------------------------------------------------------- +%% received remote puback message +%%---------------------------------------------------------------- +handle_info({puback, #{packet_id := PkgId}}, State = #state{queue = Queue, store_type = StoreType}) -> + % lists:keydelete(PkgId, 1, Queue) + {noreply, State#state{queue = delete(StoreType, PkgId, Queue)}}; + +handle_info({'EXIT', Pid, normal}, State = #state{client_pid = Pid}) -> + {noreply, State#state{client_pid = undefined}}; + +handle_info({'EXIT', Pid, Reason}, State = #state{client_pid = Pid, + reconnect_time = ReconnectTime, + def_reconnect_count = DefReconnectCount}) -> + lager:warning("emqx bridge stop reason:~p", [Reason]), + erlang:send_after(ReconnectTime, self(), start), + {noreply, State#state{client_pid = undefined, reconnect_count = DefReconnectCount}}; + +handle_info(Info, State) -> + emqx_logger:error("[Bridge] unexpected info: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, #state{}) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +proto_ver(mqtt3) -> v3; +proto_ver(mqtt4) -> v4; +proto_ver(mqtt5) -> v5. +address(Address) -> + case string:tokens(Address, ":") of + [Host] -> {Host, 1883}; + [Host, Port] -> {Host, list_to_integer(Port)} + end. +options(Options) -> + options(Options, []). +options([], Acc) -> + Acc; +options([{username, Username}| Options], Acc) -> + options(Options, [{username, Username}|Acc]); +options([{proto_ver, ProtoVer}| Options], Acc) -> + options(Options, [{proto_ver, proto_ver(ProtoVer)}|Acc]); +options([{password, Password}| Options], Acc) -> + options(Options, [{password, Password}|Acc]); +options([{keepalive, Keepalive}| Options], Acc) -> + options(Options, [{keepalive, Keepalive}|Acc]); +options([{client_id, ClientId}| Options], Acc) -> + options(Options, [{client_id, ClientId}|Acc]); +options([{clean_start, CleanStart}| Options], Acc) -> + options(Options, [{clean_start, CleanStart}|Acc]); +options([{address, Address}| Options], Acc) -> + {Host, Port} = address(Address), + options(Options, [{host, Host}, {port, Port}|Acc]); +options([_Option | Options], Acc) -> + options(Options, Acc). + +name(Id) -> + list_to_atom(lists:concat([?MODULE, "_", Id])). + +i2b(L) -> iolist_to_binary(L). + +mountpoint(undefined, Topic) -> + Topic; +mountpoint(Prefix, Topic) -> + <>. + +format_mountpoint(undefined) -> + undefined; +format_mountpoint(Prefix) -> + binary:replace(i2b(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)). + +store(memory, Data, Queue, MaxPendingMsg) when length(Queue) =< MaxPendingMsg -> + [Data | Queue]; +store(memory, _Data, Queue, _MaxPendingMsg) -> + lager:error("Beyond max pending messages"), + Queue; +store(disk, Data, Queue, _MaxPendingMsg)-> + [Data | Queue]. + +delete(memory, PkgId, Queue) -> + lists:keydelete(PkgId, 1, Queue); +delete(disk, PkgId, Queue) -> + lists:keydelete(PkgId, 1, Queue). \ No newline at end of file diff --git a/src/emqx_bridge1_sup.erl b/src/emqx_bridge1_sup.erl new file mode 100644 index 000000000..444c7cfb5 --- /dev/null +++ b/src/emqx_bridge1_sup.erl @@ -0,0 +1,45 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_bridge1_sup). + +-behavior(supervisor). + +-include("emqx.hrl"). + +-export([start_link/0, bridges/0]). + +%% Supervisor callbacks +-export([init/1]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +%% @doc List all bridges +-spec(bridges() -> [{node(), topic(), pid()}]). +bridges() -> + [{Name, emqx_bridge1:status(Pid)} || {Name, Pid, _, _} <- supervisor:which_children(?MODULE)]. + +init([]) -> + BridgesOpts = emqx_config:get_env(bridges, []), + Bridges = [spec(Opts)|| Opts <- BridgesOpts], + {ok, {{one_for_one, 10, 100}, Bridges}}. + +spec({Id, Options})-> + #{id => Id, + start => {emqx_bridge1, start_link, [Id, Options]}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [emqx_bridge1]}. \ No newline at end of file diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 9d332a5f2..7015590d8 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -69,9 +69,9 @@ subscribe(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) -> -spec(subscribe(topic(), pid() | subid(), subid() | subopts()) -> ok). subscribe(Topic, SubPid, SubId) when is_binary(Topic), is_pid(SubPid), ?is_subid(SubId) -> - subscribe(Topic, SubPid, SubId, []); + subscribe(Topic, SubPid, SubId, #{}); subscribe(Topic, SubPid, SubId) when is_binary(Topic), is_pid(SubPid), ?is_subid(SubId) -> - subscribe(Topic, SubPid, SubId, []); + subscribe(Topic, SubPid, SubId, #{}); subscribe(Topic, SubPid, SubOpts) when is_binary(Topic), is_pid(SubPid), is_map(SubOpts) -> subscribe(Topic, SubPid, undefined, SubOpts); subscribe(Topic, SubId, SubOpts) when is_binary(Topic), ?is_subid(SubId), is_map(SubOpts) -> diff --git a/src/emqx_message.erl b/src/emqx_message.erl index ae8670942..da762703e 100644 --- a/src/emqx_message.erl +++ b/src/emqx_message.erl @@ -69,7 +69,9 @@ unset_flag(Flag, Msg = #message{flags = Flags}) -> set_headers(Headers, Msg = #message{headers = undefined}) when is_map(Headers) -> Msg#message{headers = Headers}; set_headers(New, Msg = #message{headers = Old}) when is_map(New) -> - Msg#message{headers = maps:merge(Old, New)}. + Msg#message{headers = maps:merge(Old, New)}; +set_headers(_, Msg) -> + Msg. get_header(Hdr, Msg) -> get_header(Hdr, Msg, undefined). diff --git a/src/emqx_sup.erl b/src/emqx_sup.erl index 563244232..cddfea8b5 100644 --- a/src/emqx_sup.erl +++ b/src/emqx_sup.erl @@ -63,6 +63,7 @@ init([]) -> BrokerSup = supervisor_spec(emqx_broker_sup), %% BridgeSup BridgeSup = supervisor_spec(emqx_bridge_sup_sup), + BridgeSup1 = supervisor_spec(emqx_bridge1_sup), %% AccessControl AccessControl = worker_spec(emqx_access_control), %% Session Manager @@ -78,6 +79,7 @@ init([]) -> RouterSup, BrokerSup, BridgeSup, + BridgeSup1, AccessControl, SMSup, SessionSup, diff --git a/src/emqx_time.erl b/src/emqx_time.erl index 97ea4b573..0d74168c4 100644 --- a/src/emqx_time.erl +++ b/src/emqx_time.erl @@ -14,7 +14,7 @@ -module(emqx_time). --export([seed/0, now_secs/0, now_ms/0]). +-export([seed/0, now_secs/0, now_ms/0, now_ms/1]). seed() -> rand:seed(exsplus, erlang:timestamp()). @@ -25,3 +25,5 @@ now_secs() -> now_ms() -> erlang:system_time(millisecond). +now_ms({MegaSecs, Secs, MicroSecs}) -> + (MegaSecs * 1000000 + Secs) * 1000 + round(MicroSecs/1000). \ No newline at end of file From d4176461ff6ae736e08a2efd0c466ef300d8d34c Mon Sep 17 00:00:00 2001 From: Petr Gotthard Date: Mon, 20 Aug 2018 11:58:19 +0200 Subject: [PATCH 2/3] Send client_pid to distinguish multiple clients When a controlling process starts multiple clients that make multiple subscriptions it may be desirable to identify from which client a message is comming from. The topic id may not be sufficient. --- src/emqx_client.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 695b9d10c..27f8be353 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -989,7 +989,8 @@ deliver(#mqtt_msg{qos = QoS, dup = Dup, retain = Retain, packet_id = PacketId, topic = Topic, props = Props, payload = Payload}, State = #state{owner = Owner}) -> Owner ! {publish, #{qos => QoS, dup => Dup, retain => Retain, packet_id => PacketId, - topic => Topic, properties => Props, payload => Payload}}, + topic => Topic, properties => Props, payload => Payload, + client_pid => self()}}, State. packet_to_msg(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, From 36647b641fb1b5e1b3f600ee974e8578df5240a2 Mon Sep 17 00:00:00 2001 From: turtled Date: Fri, 24 Aug 2018 11:38:54 +0800 Subject: [PATCH 3/3] Fix select emqx_shared_subscription fail --- src/emqx_bridge1.erl | 2 +- src/emqx_shared_sub.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/emqx_bridge1.erl b/src/emqx_bridge1.erl index 139711932..cfad74803 100644 --- a/src/emqx_bridge1.erl +++ b/src/emqx_bridge1.erl @@ -19,7 +19,7 @@ -include("emqx.hrl"). -include("emqx_mqtt.hrl"). - -import(proplists, [get_value/2, get_value/3]). +-import(proplists, [get_value/2, get_value/3]). -export([start_link/2, start_bridge/1, stop_bridge/1, status/1]). diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 2e4772f05..5194de9d4 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -97,7 +97,7 @@ pick(SubPids) -> lists:nth((X rem length(SubPids)) + 1, SubPids). subscribers(Group, Topic) -> - ets:select(?TAB, [{{shared_subscription, Group, Topic, '$1'}, [], ['$1']}]). + ets:select(?TAB, [{{emqx_shared_subscription, Group, Topic, '$1'}, [], ['$1']}]). %%----------------------------------------------------------------------------- %% gen_server callbacks