Integrate with ekka library
This commit is contained in:
parent
e4ac8a56dd
commit
1ea630bc9e
71
etc/emq.conf
71
etc/emq.conf
|
@ -7,14 +7,71 @@
|
|||
## Cluster
|
||||
##--------------------------------------------------------------------
|
||||
|
||||
## The cluster Id
|
||||
cluster.id = emq
|
||||
## Cluster name
|
||||
cluster.name = ekka
|
||||
|
||||
## Cookie for distributed node
|
||||
cluster.cookie = emqsecretcookie
|
||||
## Cluster Cookie
|
||||
cluster.cookie = ekkaclustercookie
|
||||
|
||||
## The multicast address and port.
|
||||
cluster.multicast = 239.192.0.1:44369
|
||||
## Cluster Discovery: static | epmd | multicast | gossip | etcd | consul
|
||||
cluster.discovery = static
|
||||
|
||||
## Cluster Autoheal: on | off
|
||||
cluster.autoheal = on
|
||||
|
||||
## Clean down node of the cluster
|
||||
cluster.clean_down = 1h
|
||||
|
||||
##--------------------------------------------------------------------
|
||||
## Cluster with epmd
|
||||
|
||||
cluster.epmd.seeds = a@127.0.0.1,b@127.0.0.1
|
||||
|
||||
##--------------------------------------------------------------------
|
||||
## Cluster with multicast
|
||||
|
||||
## 1 second
|
||||
cluster.mcast.period = 1s
|
||||
|
||||
cluster.mcast.addr = 239.192.0.1:4369
|
||||
|
||||
cluster.mcast.iface = 0.0.0.0
|
||||
|
||||
cluster.mcast.ttl = 1
|
||||
|
||||
cluster.mcast.loop = on
|
||||
|
||||
##--------------------------------------------------------------------
|
||||
## Cluster with Gossip
|
||||
|
||||
cluster.gossip.seeds = 127.0.0.1:4369
|
||||
|
||||
cluster.gossip.protocol_period = 1s
|
||||
|
||||
cluster.gossip.suspicion_factor = 3
|
||||
|
||||
##--------------------------------------------------------------------
|
||||
## Cluster with Etcd
|
||||
|
||||
cluster.etcd.addr = 127.0.0.1:2367
|
||||
|
||||
cluster.etcd.prefix = emq
|
||||
|
||||
cluster.etcd.node_ttl = 30m
|
||||
|
||||
##--------------------------------------------------------------------
|
||||
## Cluster by Consul
|
||||
|
||||
cluster.consul.addr = 127.0.0.1:8500
|
||||
|
||||
cluster.consul.acl_token = example-acl-token
|
||||
|
||||
##--------------------------------------------------------------------
|
||||
## Discover by Kubernetes
|
||||
|
||||
## cluster.k8s.selector = app=emq
|
||||
|
||||
## cluster.k8s.node_basename = emq
|
||||
|
||||
##--------------------------------------------------------------------
|
||||
## Node Args
|
||||
|
@ -24,7 +81,7 @@ cluster.multicast = 239.192.0.1:44369
|
|||
node.name = emqttd@127.0.0.1
|
||||
|
||||
## Cookie for distributed node
|
||||
node.cookie = emqsecretcookie
|
||||
## node.cookie = emqsecretcookie
|
||||
|
||||
## SMP support: enable, auto, disable
|
||||
node.smp = auto
|
||||
|
|
173
priv/emq.schema
173
priv/emq.schema
|
@ -5,31 +5,168 @@
|
|||
%% Cluster
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% Cluster ID
|
||||
{mapping, "cluster.id", "emqttd.cluster", [
|
||||
{default, "emq"},
|
||||
{datatype, string}
|
||||
%% @doc Cluster name
|
||||
{mapping, "cluster.name", "ekka.cluster_name", [
|
||||
{default, emqcl},
|
||||
{datatype, atom}
|
||||
]}.
|
||||
|
||||
%% @doc Secret cookie for distributed erlang node
|
||||
%% @doc Secret cookie for the cluster
|
||||
{mapping, "cluster.cookie", "vm_args.-setcookie", [
|
||||
{default, "emqsecretcookie"}
|
||||
{default, "emqclustercookie"}
|
||||
]}.
|
||||
|
||||
%% Cluster Multicast Addr
|
||||
{mapping, "cluster.multicast", "emqttd.cluster", [
|
||||
{default, "239.192.0.1:44369"},
|
||||
%% @doc Cluster discovery
|
||||
{mapping, "cluster.discovery", "ekka.cluster_discovery", [
|
||||
{default, manual},
|
||||
{datatype, atom}
|
||||
]}.
|
||||
|
||||
%% @doc Cluster autoheal
|
||||
{mapping, "cluster.autoheal", "ekka.cluster_autoheal", [
|
||||
{datatype, flag},
|
||||
{default, on}
|
||||
]}.
|
||||
|
||||
|
||||
%% @doc Clean down node from the cluster
|
||||
{mapping, "cluster.clean_down", "ekka.cluster_clean_down", [
|
||||
{datatype, {duration, ms}},
|
||||
{default, "1h"}
|
||||
]}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Cluster with epmd
|
||||
|
||||
{mapping, "cluster.epmd.seeds", "ekka.cluster_discovery", [
|
||||
{datatype, string}
|
||||
]}.
|
||||
|
||||
{translation, "emqttd.cluster", fun(Conf) ->
|
||||
Multicast = cuttlefish:conf_get("cluster.multicast", Conf),
|
||||
[Addr, Port] = string:tokens(Multicast, ":"),
|
||||
{ok, Ip} = inet_parse:address(Addr),
|
||||
[{id, cuttlefish:conf_get("cluster.id", Conf)},
|
||||
{multicast, {Ip, list_to_integer(Port)}}]
|
||||
%%--------------------------------------------------------------------
|
||||
%% Cluster with IP Multicast
|
||||
|
||||
{mapping, "cluster.mcast.addr", "ekka.cluster_discovery", [
|
||||
{datatype, ip}
|
||||
]}.
|
||||
|
||||
{mapping, "cluster.mcast.period", "ekka.cluster_discovery", [
|
||||
{datatype, {duration, ms}},
|
||||
{default, "1s"}
|
||||
]}.
|
||||
|
||||
{mapping, "cluster.mcast.iface", "ekka.cluster_discovery", [
|
||||
{datatype, string},
|
||||
{default, "0.0.0.0"}
|
||||
]}.
|
||||
|
||||
{mapping, "cluster.mcast.ttl", "ekka.cluster_discovery", [
|
||||
{datatype, integer},
|
||||
{default, 1}
|
||||
]}.
|
||||
|
||||
{mapping, "cluster.mcast.loop", "ekka.cluster_discovery", [
|
||||
{datatype, flag},
|
||||
{default, on}
|
||||
]}.
|
||||
|
||||
{mapping, "cluster.mcast.sndbuf", "ekka.cluster_discovery", [
|
||||
{datatype, bytesize},
|
||||
{default, "16KB"}
|
||||
]}.
|
||||
|
||||
{mapping, "cluster.mcast.recbuf", "ekka.cluster_discovery", [
|
||||
{datatype, bytesize},
|
||||
{default, "16KB"}
|
||||
]}.
|
||||
|
||||
{mapping, "cluster.mcast.buffer", "ekka.cluster_discovery", [
|
||||
{datatype, bytesize},
|
||||
{default, "32KB"}
|
||||
]}.
|
||||
|
||||
{mapping, "cluster.gossip.seeds", "ekka.cluster_discovery", [
|
||||
{datatype, string}
|
||||
]}.
|
||||
|
||||
{mapping, "cluster.gossip.protocol_period", "ekka.cluster_discovery", [
|
||||
{datatype, {duration, ms}},
|
||||
{default, "1s"}
|
||||
]}.
|
||||
|
||||
{mapping, "cluster.gossip.suspicion_factor", "ekka.cluster_discovery", [
|
||||
{datatype, integer},
|
||||
{default, 3}
|
||||
]}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Cluster with Etcd
|
||||
|
||||
{mapping, "cluster.etcd.addr", "ekka.cluster_discovery", [
|
||||
{datatype, string}
|
||||
]}.
|
||||
|
||||
{mapping, "cluster.etcd.prefix", "ekka.cluster_discovery", [
|
||||
{datatype, string}
|
||||
]}.
|
||||
|
||||
{mapping, "cluster.etcd.node_ttl", "ekka.cluster_discovery", [
|
||||
{datatype, {duration, ms}},
|
||||
{default, "1m"}
|
||||
]}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Cluster with Consul
|
||||
|
||||
{mapping, "cluster.consul.addr", "ekka.cluster_discovery", [
|
||||
{datatype, ip}
|
||||
]}.
|
||||
|
||||
{mapping, "cluster.consul.acl_token", "ekka.cluster_discovery", [
|
||||
{datatype, string}
|
||||
]}.
|
||||
|
||||
{translation, "ekka.cluster_discovery", fun(Conf) ->
|
||||
Strategy = cuttlefish:conf_get("cluster.discovery", Conf),
|
||||
Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end,
|
||||
IpPort = fun(S) ->
|
||||
[Addr, Port] = string:tokens(S, ":"),
|
||||
{ok, Ip} = inet:parse_address(Addr),
|
||||
{Ip, Port}
|
||||
end,
|
||||
Options = fun(static) ->
|
||||
[{seeds, cuttlefish:conf_get("cluster.epmd.seeds", Conf)}];
|
||||
(mcast) ->
|
||||
{Addr, Port} = cuttlefish:conf_get("cluster.mcast.addr", Conf),
|
||||
{ok, Ip} = inet:parse_address(Addr),
|
||||
{ok, Iface} = inet:parse_address(cuttlefish:conf_get("cluster.mcast.iface", Conf)),
|
||||
[{addr, Ip}, {port, Port}, {iface, Iface},
|
||||
{period, cuttlefish:conf_get("cluster.mcast.period", Conf)},
|
||||
{ttl, cuttlefish:conf_get("cluster.mcast.ttl", Conf, 1)},
|
||||
{loop, cuttlefish:conf_get("cluster.mcast.loop", Conf, true)}];
|
||||
(etcd) ->
|
||||
[{seeds, cuttlefish:conf_get("cluster.epmd.seeds", Conf)},
|
||||
{clean_down, cuttlefish:conf_get("cluster.epmd.clean_down", Conf, undefined)}];
|
||||
(gossip) ->
|
||||
[{seeds, [IpPort(S) || S <- string:tokens(",", cuttlefish:conf_get("cluster.gossip.seeds", Conf))]},
|
||||
{protocol_period, cuttlefish:conf_get("cluster.gossip.protocol_period", Conf)},
|
||||
{suspicion_factor, cuttlefish:conf_get("cluster.gossip.suspicion_factor", Conf, 3)}];
|
||||
(etcd) ->
|
||||
[{addr, cuttlefish:conf_get("cluster.etcd.addr", Conf)},
|
||||
{prefix, cuttlefish:conf_get("cluster.etcd.prefix", Conf, "emq")},
|
||||
{node_ttl, cuttlefish:conf_get("cluster.etcd.node_ttl", Conf, 60)}];
|
||||
(consul) ->
|
||||
[{addr, cuttlefish:conf_get("cluster.consul.addr", Conf)},
|
||||
{acl_token, cuttlefish:conf_get("cluster.consul.acl_token", Conf)}];
|
||||
(k8s) ->
|
||||
[{host, cuttlefish:conf_get("cluster.k8s.selector", Conf)},
|
||||
{acl_token, cuttlefish:conf_get("cluster.k8s.node_basename", Conf)}];
|
||||
(manual) ->
|
||||
[ ]
|
||||
end,
|
||||
{Strategy, Filter(Options(Strategy))}
|
||||
end}.
|
||||
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Erlang Node
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -40,9 +177,9 @@ end}.
|
|||
]}.
|
||||
|
||||
%% @doc Secret cookie for distributed erlang node
|
||||
{mapping, "node.cookie", "vm_args.-setcookie", [
|
||||
{default, "emqsecretcookie"}
|
||||
]}.
|
||||
%% {mapping, "node.cookie", "vm_args.-setcookie", [
|
||||
%% {default, "emqsecretcookie"}
|
||||
%%]}.
|
||||
|
||||
%% @doc SMP Support
|
||||
{mapping, "node.smp", "vm_args.-smp", [
|
||||
|
|
Loading…
Reference in New Issue