关于cassandra集群的数据一致性问题

cassandra集群要求严格的时间同步,有一点同步就会发生这样那样的问题,这个事情我已经在cassandra集群要求严格的时间同步里说明了,所以时间同步是cassandra集群的前提。

cassandra使用的是最后一致性模型,也就是说一开始的并发更新的数据可能是不一致的,但是经过这段不一致的时间之后,系统会达到最终的一致性。让每个客户端看到的结果是一样的。

这个最终一致性的强度,在cassandra中是有你所选的一致性模型决定的。通常使用cassandra,我们选择QUORUM级别,表示有半数副本收到请求的时候,返回客户端响应,这样保证插入的数据,可以肯定被查询到。然而这里存在一个问题,关于并发性,假设客户端对同一条记录进行更新,cassandra是根据什么判断请求的先后呢?只有时间,cassandra会根据请求到达服务器的先后时间。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
QueryOptions options = new QueryOptions();
options.setConsistencyLevel(ConsistencyLevel.QUORUM);
 
Cluster cluster = Cluster.builder()
.addContactPoint("192.168.1.101")
.withCredentials("cassandra", "cassandra")
.withQueryOptions(options)
.build();
 
Session session = cluster.connect();
RegularStatement update10 = QueryBuilder.update("myKeysapce","tableName")
.with(QueryBuilder.set("col2", 10))
.where(QueryBuilder.eq("key1", 1));
session.execute(update10);
 
RegularStatement update20 = QueryBuilder.update("myKeysapce","tableName")
.with(QueryBuilder.set("col2", 20))
.where(QueryBuilder.eq("key1", 1));
session.execute(update20);

但是cassandra集群有多台机器,客户端发到服务器的不同机器上呢?糟了,数据乱掉了。是的,当你使用datastax的驱动程序的时候,你会发现快速对同一条记录进行两次更新,最终的结果有时候并不是第二个请求更新的结果,就像上面的例子,每次更新结果可能是20,也可能是10。即便你的一致性级别选择的ALL,也有可能发生这样的情况。因为两次请求的时间间隔实在很短,而集群的所有机器又不能完全时间同步,即便是使用了ntp同步,时间差也会在ms级别,两次请求发到不同的机器上,就会发生这样的问题。

怎么办呢?当我们换用另外一个cassandra客户端Astyana的时候,我们发现并不会发生上面描述的情况,这是为什么呢?难道客户端有问题,经过调查发现,Astyanax客户端发的两次请求都是发到了集群的同一个节点,而datastax官方驱动客户端,却是发向了不同的节点。

原来Astyanax客户端有一个请求策略的概念,它有三种策略(TOKEN_AWARE,ROUND_ROBIN和BAG),其中TOKEN_AWARE就是根据主键token请求到相同的客户端。
那原生的datastax客户端有没有这样的概念呢?调查后发现也是有的,它叫做LoadBalancingPolicy,可以通过 Cluster.builder().withLoadBalancingPolicy(policy)指定,它也有三个策略,分别是:

DCAwareRoundRobinPolicy
RoundRobinPolicy
TokenAwarePolicy

其中TokenAwarePolicy就是根据token把对同一条记录的请求,发到同一个节点,看代码我们发现datastax默认使用的策略就是TokenAwarePolicy,那为什么没有和Astyana一样的效果呢?

通过阅读它的代码,原因找到了,那就是在更新的时候,要给它指定表的tablemetadata,否则datastatx无法知道哪些字段是主键,额,貌似这个客户端也太傻了。。。
上面的例子改成下面这样,就万事大吉了。

1
2
3
4
TableMetadata metaData = cluster.getMetadata().getKeyspace("myKeyspace").getTable("tableName");
RegularStatement update10 = QueryBuilder.update(metaData)
.with(QueryBuilder.set("col2", 10))
.where(QueryBuilder.eq("key1", 1));
  1. 会喷火的小猪说道:

    最近正在使用cassandra-2.1.9,使用时进行了集群,发现有时候会发生更新失败的情况。使用的是package org.springframework.data.cassandra.core;中的CassandraTemplate类进行的更新,cql确认执行了,但是数据库的内容并没有发生变化,但时有时候又能更新成功,您遇到过这种情况吗

    1. 大岩不灿说道:

      除了文中的均衡策略的问题,还有就是要关注你的集群机器的时间是否同步。

  2. 阙乃祯说道:

    cluster.setLoadBalancingPolicy(new TokenAwarePolicy(new DCAwareRoundRobinPolicy(), false));

    要这么设置,标记shuffleReplicas为false,即不打乱存放在treeSet里面hosts的顺便。否则无效

    1. 大岩不灿说道:

      看下驱动的
      com.datastax.driver.core.policies.TokenAwarePolicy
      这个方法:

      public Iterator<Host> newQueryPlan(final String loggedKeyspace, final Statement statement) {
       
              ByteBuffer partitionKey = statement.getRoutingKey();
              String keyspace = statement.getKeyspace();
              if (keyspace == null)
                  keyspace = loggedKeyspace;
       
              if (partitionKey == null || keyspace == null)
                  return childPolicy.newQueryPlan(keyspace, statement);
       
              final Set<Host> replicas = clusterMetadata.getReplicas(Metadata.quote(keyspace), partitionKey);
              if (replicas.isEmpty())
                  return childPolicy.newQueryPlan(loggedKeyspace, statement);
       
              final Iterator<Host> iter;
              if (shuffleReplicas) {
                  List<Host> l = Lists.newArrayList(replicas);
                  Collections.shuffle(l);
                  iter = l.iterator();
              } else {
                  iter = replicas.iterator();
              }

      如果partitionKey 为null,根本走不下去

      if (partitionKey == null || keyspace == null)
                  return childPolicy.newQueryPlan(keyspace, statement);

      再看partitionKey 是怎么来的,看
      com.datastax.driver.core.querybuilder.Insert
      com.datastax.driver.core.querybuilder.Update等基类
      com.datastax.driver.core.querybuilder.BuiltStatement的构造方法
      如下:

      BuiltStatement(String keyspace) {
              this.partitionKey = null;
              this.routingKey = null;
              this.keyspace = keyspace;
          }
       
          BuiltStatement(TableMetadata tableMetadata) {
              this.partitionKey = tableMetadata.getPartitionKey();
              this.routingKey = new ByteBuffer[tableMetadata.getPartitionKey().size()];
              this.keyspace = escapeId(tableMetadata.getKeyspace().getName());
          }

      你不传TableMetadata ,他是不帮你构造routingKey帮你根据token路由请求的。

    2. 大岩不灿说道:
        public static LoadBalancingPolicy defaultLoadBalancingPolicy() {
              // Note: balancing policies are stateful, so we can't store that in a static or that would screw thing
              // up if multiple Cluster instance are started in the same JVM.
              return new TokenAwarePolicy(new DCAwareRoundRobinPolicy());
          }
       
        public TokenAwarePolicy(LoadBalancingPolicy childPolicy) {
              this(childPolicy, false);
          }
      驱动默认的就是false,你说传true吗?
    3. 大岩不灿说道:

      多谢留言,一直使用的cassandra2.1.3版本java驱动,今天看了下源码,发现在2.1.3版本y的时候默认是false

      public TokenAwarePolicy(LoadBalancingPolicy childPolicy) {
              this(childPolicy, false);
          }

      到了2.1.4版本以上的驱动,此处被修改为了默认为true

      public TokenAwarePolicy(LoadBalancingPolicy childPolicy) {
              this(childPolicy, true);
          }
  3. 云顶吹风说道:

留言

提示:你的email不会被公布,欢迎留言^_^

*

验证码 * Time limit is exhausted. Please reload CAPTCHA.