java实现cassandra的增删改查

cassandra使用cql语言作为操作语言,cassandra在2.0之后,在操作上越来越像sql数据库的操作,这样想从传统关系型数据库,切换到cassandra的花,上手成本也越来越低。使用官方java驱动操作cassandra 非常简单。

maven引入驱动包

1
2
3
4
5
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>2.0.1</version>
</dependency>

1、创建应用的唯一session。

1
2
3
Cluster cluster = Cluster.builder()
                         .addContactPoint("192.168.22.161")
                         .build();

这里构建一个集群对象,”192.168.22.161″ 是cassandra的种子节点(seed node).

1
Session session = cluster.connect();

你也可以针对一个特定的keyspace获取一个session

1
Session session = cluster.connect("mykeyspace");

session是线程安全的,所以一个应用中,你可以只有一个session实例,官方建议一个keyspace一个session。
2、session可以直接支持执行cql语句。

1
2
String cql = "select * from mykeyspace.tablename;";
session.execute(cql);

你完全可以用这种方式完成任意操作,记住cql语句后面一定要带分号。

3、如果你不想繁琐的去拼字符串,你可以用com.datastax.driver.core.Querybuilder。

insert 一条记录String cql = “insert into mykeyspace.tablename(a,b) values(1,2);”
你可以这样写:

1
2
3
session.execute(
QueryBuilder.insertInto("mykeyspace", "tablename")
            .values(new String[]{"a","b"}, new Object[]{1,2}));

delete 记录String cql = “delete from mykeyspace.tablename where a=1”;
你可以这样写:

1
2
3
session.execute(QueryBuilder.delete()
	   .from("mykeyspace", "tablename")
	   .where(QueryBuilder.eq("a", 1)));

update 记录String cql = “update mykeyspace.tablename set b=2 where a=1”

1
2
3
session.execute(QueryBuilder.update("mykeyspace", "tablename")
       .with(QueryBuilder.set("b", 2))
       .where(QueryBuilder.eq("a", 1)));

select 记录String cql = “select a, b from mykeyspace.tablename where a>1 and b<0”

1
2
3
4
5
6
7
8
9
10
11
ResultSet result = session.execute(QueryBuilder.select("a","b")
		.from("mykeyspace", "tablename")
		.where(QueryBuilder.gt("a", 1))
		.and(QueryBuilder.lt("a", 1)));
Iterator<Row> iterator = result.iterator();
while(iterator.hasNext())
{
	Row row = iterator.next();
	row.getInt("a");
	row.getInt("b");
}

注:cassandra的查询的支持是很有限的,对于查询的限制可以参考:
http://zhaoyanblog.com/archives/164.html

4、你也可以像jdbc那样使用预编译占位符的方式。

1
2
3
4
5
BoundStatement bindStatement = 
session.prepare(
"select * from mykeyspace.tablename where a=? and b=?")
.bind("1","2");
session.execute(bindStatement);

或者

1
2
3
4
5
6
PreparedStatement prepareStatement = 
session.prepare(
"select * from mykeyspace.tablename where a=? and b=?");
BoundStatement bindStatement = 
     new BoundStatement(prepareStatement).bind("1","2");
session.execute(bindStatement);

或者

1
2
3
4
5
6
7
Insert insert = 
QueryBuilder.insertInto("mykeyspace", "tablename")
.values(new String[]{"a","b"}, 
new Object[]{QueryBuilder.bindMarker(),QueryBuilder.bindMarker()});
BoundStatement bindStatement = 
    new BoundStatement(session.prepare(insert)).bind("1","2");
session.execute(bindStatement);

5、批量batch的方式也有的。

1
2
3
4
BatchStatement batchStatement = new BatchStatement();
batchStatement.add(insert);
batchStatement.add(bindStatement);
session.execute(batchStatement);

注:批量操作只支持INSERT,UPDATE,DELETE
cassandra新版的java驱动,是灵活多变的,文章里只是简单的示例了下,你可以通过驱动的源码,或者是官方的api文档,灵活的使用。

附:
增删改查demo

  1. lipengcheng说道:

    拜托了岩哥!!!初学者,还请多多指教!问题在下边,已经贴出来了!

  2. lipengcheng说道:

    报这个问题,log4j:WARN No appenders could be found for logger (com.datastax.driver.core.FrameCompressor).
    log4j:WARN Please initialize the log4j system properly.
    log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
    Exception in thread “main” com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /222.197.221.227:9042 (com.datastax.driver.core.ConnectionException: [/222.197.221.227:9042] Unexpected error during transport initialization (com.datastax.driver.core.TransportException: [/222.197.221.227:9042] Connection has been closed)))
    at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:196)
    at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:80)
    at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1145)
    at com.datastax.driver.core.Cluster.init(Cluster.java:149)
    at com.datastax.driver.core.Cluster.connect(Cluster.java:225)
    at com.zhaoyanblog.CassandraMain.main(CassandraMain.java:27)
    下边是代码:
    public class CassandraMain{
    public static void main(String[] args)
    {
    QueryOptions options = new QueryOptions();
    options.setConsistencyLevel(ConsistencyLevel.QUORUM);

    Cluster cluster = Cluster.builder()
    .addContactPoint(“222.197.221.227”)
    .build();

    Session session = cluster.connect();
    session.execute(“CREATE KEYSPACE kp WITH replication = {‘class’: ‘SimpleStrategy’, ‘replication_factor’: 1};”);

    // 閽堝keyspace鐨剆ession锛屽悗闈㈣〃鍚嶅墠闈笉鐢ㄥ姞keyspace
    Session kpSession = cluster.connect(“kp”);
    kpSession.execute(“CREATE TABLE tbl(a INT, b INT, c INT, PRIMARY KEY(a));”);

    RegularStatement insert = QueryBuilder.insertInto(“kp”, “tbl”).values(new String[] {“a”, “b”, “c”}, new Object[] {1, 2, 3});
    kpSession.execute(insert);

    RegularStatement insert2 = QueryBuilder.insertInto(“kp”, “tbl”).values(new String[] {“a”, “b”, “c”}, new Object[] {3, 2, 1});
    kpSession.execute(insert2);

    RegularStatement delete = QueryBuilder.delete().from(“kp”, “tbl”).where(QueryBuilder.eq(“a”, 1));
    kpSession.execute(delete);

    RegularStatement update = QueryBuilder.update(“kp”, “tbl”).with(QueryBuilder.set(“b”, 6)).where(QueryBuilder.eq(“a”, 3));
    kpSession.execute(update);

    RegularStatement select = QueryBuilder.select().from(“kp”, “tbl”).where(QueryBuilder.eq(“a”, 3));
    ResultSet rs = kpSession.execute(select);
    Iterator iterator = rs.iterator();
    while (iterator.hasNext())
    {
    Row row = iterator.next();
    System.out.println(“a=” + row.getInt(“a”));
    System.out.println(“b=” + row.getInt(“b”));
    System.out.println(“c=” + row.getInt(“c”));
    }
    kpSession.close();
    session.close();
    cluster.close();
    }

    }

    1. 大岩不灿说道:

      网络是不是不通啊?

  3. 诺言说道:

    您好!
    我在使用您提供的demo代码的时候,每次运行都连不上Cassandra的数据库,总报下面的异常信息
    log4j:WARN No appenders could be found for logger (com.datastax.driver.core.FrameCompressor).
    log4j:WARN Please initialize the log4j system properly.
    log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
    Exception in thread “main” com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /192.168.43.61:9160 (com.datastax.driver.core.TransportException: [/192.168.43.61:9160] Cannot connect))
    at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:196)
    at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:80)
    at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1145)
    at com.datastax.driver.core.Cluster.init(Cluster.java:149)
    at com.datastax.driver.core.Cluster.connect(Cluster.java:225)
    at CassandraMain.main(CassandraMain.java:26)

    我的java代码如下:
    public class CassandraMain {

    public static void main(String[] args)
    {
    Cluster cluster = Cluster.builder()
    .addContactPoint(“192.168.43.61”)
    .withPort(9160)
    .build();

    // 针对qfddb的session,后面表名前面不用加qfddb
    Session kpSession = cluster.connect();

    RegularStatement select = QueryBuilder.select().from(“qfddb”, “appidmod1”);//.where(QueryBuilder.eq(“a”, 3))
    ResultSet rs = kpSession.execute(select);
    Iterator iterator = rs.iterator();
    while (iterator.hasNext())
    {
    Row row = iterator.next();
    System.out.println(“123”);
    }
    kpSession.close();
    cluster.close();
    }

    }
    初次学习Cassandra,还望多多指教,谢谢!

    1. 大岩不灿说道:

      端口是9042 不是9160

  4. 云顶吹风说道:

    写的很好很详细。

  5. 大岩不灿说道:

    demo附件见正文末尾~

    1. 卓越说道:

      谢谢岩哥,如果有空写写cassandra的连接池方面的知识吧,你的博客我收藏了,向你学习

  6. 卓越说道:

    第一次接触cassandra,刚用三台电脑搭建了一个集群,win7系统下(没时间换linux),求大神给个java连接cassandra集群的增删改差demo

  7. 张霞说道:

    你好,公司领导让我做一个增删改查demo出来,看了贵贴觉得写的很详细,但是就是不知道如何下手 maven引入驱动包不知道该怎么引入,还有就是这个Cluster cluster = Cluster.builder() .addContactPoint(“192.168.22.161”) .build();不需要制定端口吗?
    TTransport tr = new TFramedTransport(new TSocket(“192.168.23.143”,9160));
    TProtocol proto = new TBinaryProtocol(tr);
    Cassandra.Client client = new Cassandra.Client(proto);
    tr.open(); 我是在网上搜罗到了这种写法,感觉跟你的比显得我写的这个代码好笨拙。

    1. 大岩不灿说道:

      cassandra的java客户端又两种,一种是thrift协议的接口实现的,这个是cassandra早期版本唯一支持的远程通信协议,默认端口9160。另外一种是新出来的native协议的接口实现的,驱动是datastax官方出的,据说比thrift接口性能高。默认端口是9042

      我使用的是datastax接口实现的客户端驱动包。
      如果你没有使用过maven,你可以直接到官方网站下载源码编译。https://github.com/datastax
      你最好学下maven的使用,它的驱动都是通过maven发布的。

      1. 张霞说道:

        //包装好的socket
        TTransport tr = new TFramedTransport(new TSocket(“192.168.23.143”,9160));
        TProtocol proto = new TBinaryProtocol(tr);
        Cassandra.Client client = new Cassandra.Client(proto);
        try {
        tr.open();

        if(!tr.isOpen())
        {
        System.out.println(“failed to connect server!”);
        return;
        }

        long temp = System.currentTimeMillis();

        client.set_keyspace(“DEMO”);//使用DEMO keyspace
        大师,我set_keyspace(“DEMO”)就有错,这是什么原因了?百度找不到答案,求解跪谢了。
        InvalidRequestException(why:Keyspace DEMO does not exist)
        at org.apache.cassandra.thrift.Cassandra$set_keyspace_result$set_keyspace_resultStandardScheme.read(Cassandra.java:9076)
        at org.apache.cassandra.thrift.Cassandra$set_keyspace_result$set_keyspace_resultStandardScheme.read(Cassandra.java:9062)
        at org.apache.cassandra.thrift.Cassandra$set_keyspace_result.read(Cassandra.java:9012)
        at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78)
        at org.apache.cassandra.thrift.Cassandra$Client.recv_set_keyspace(Cassandra.java:608)
        at org.apache.cassandra.thrift.Cassandra$Client.set_keyspace(Cassandra.java:595)
        at com.wt.ts.TestDemo.main(TestDemo.java:47)

        1. 大岩不灿说道:

          提示很直接啊 :Keyspace DEMO does not exist
          keyspace不存在,你要先创建keyspace吧。

      2. zx说道:

        Cluster cluster = Cluster.builder()
        .addContactPoint(“192.168.23.143”)
        .build();
        Session session = cluster.connect(“guangzhoutestspace”);
        String cql = “select * from mykeyspace.tablename;”;
        session.execute(cql);
        Exception in thread “main” com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /192.168.23.143 (com.datastax.driver.core.TransportException: [/192.168.23.143] Cannot connect))
        at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:195)
        at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:78)
        at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1029)
        at com.datastax.driver.core.Cluster.init(Cluster.java:120)
        at com.datastax.driver.core.Cluster.connect(Cluster.java:197)
        at com.datastax.driver.core.Cluster.connect(Cluster.java:225)
        at com.wt.test.Test_demo.cans.Cansdraga.main(Cansdraga.java:33)

        1. zx说道:

          为什么了连接不上了 麻烦指教下 //包装好的socket
          TTransport tr = new TFramedTransport(new TSocket(“192.168.23.143”,9160));
          TProtocol proto = new TBinaryProtocol(tr); 这种方式可以连接 谢谢

        2. 大岩不灿说道:

          我勒个去, 你到底想用哪个客户端哟?

        3. 大岩不灿说道:

          如果你没有创建keyspace, 你可以先创建
          Cluster cluster = Cluster.builder()
          .addContactPoint(“192.168.23.143″)
          .build();
          Session session = cluster.connect();
          session.execute(“CREATE KEYSPACE mykeyspace;”);
          然后创建表
          session.execute(“CREATE mykeyspace.mytable(int a, int b, primary key(a));”);
          然后可以插数据什么了。
          如果你增删改查的时候,表名不想前面加keyspace。
          可以再生产一个连接该keyspace的session
          Session session = cluster.connect(“mykeyspace”);

          1. zx说道:

            Cluster cluster = Cluster.builder()
            .addContactPoint(“192.168.23.143”).build();
            Session session = cluster.connect();
            session.execute(“CREATE KEYSPACE mykeyspace;”);
            log4j:WARN No appenders could be found for logger (com.datastax.driver.core.FrameCompressor).
            log4j:WARN Please initialize the log4j system properly.
            Exception in thread “main” java.lang.NoClassDefFoundError: com/google/common/util/concurrent/FutureCallback
            at com.datastax.driver.core.Cluster.(Cluster.java:88)
            at com.datastax.driver.core.Cluster.buildFrom(Cluster.java:144)
            at com.datastax.driver.core.Cluster$Builder.build(Cluster.java:906)
            at com.whaty.framework.cassandra.dao.TestCa.main(TestCa.java:11)
            Caused by: java.lang.ClassNotFoundException: com.google.common.util.concurrent.FutureCallback
            at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
            at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
            at java.security.AccessController.doPrivileged(Native Method)
            at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
            at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
            at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
            at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
            … 4 more 我想请求下大师,在编写代码之前,这个前期准备工作需要做足哪些了?为什么我用这种方式就是连接有问题,是我哪方面前期准备没做好还是?用另一种方式就可以连接。

          2. zx说道:

            还需要哪些包能够给说下吗?我的是Linux部署了个服务器apache-cassandra-2.0.10 部署了这个代码,然后我需要maven java去连接他操作数据 想用你发表的CQL这种模式去操作,觉得这个方便像SQL,你看我这个服务器部署是否有问题了?谢谢大师指点

          3. 大岩不灿说道:

            明天晚上我公布个demo吧,感觉和你说不清楚啊~

          4. zx说道:

            大师,有劳了。有demo就好了,谢谢了 我的不知道是环境问题还是什么问题,连接客户端就报异常了 提示IP连接不上 。 Cluster cluster = HFactory.getOrCreateCluster(“Test Cluster”, “192.168.23.143:9160”);

            Keyspace keyspace = HFactory.createKeyspace(“guangzhoutestspace”, cluster);

            Mutator mutator = HFactory.createMutator(keyspace, stringSerializer);

            mutator.insert(“sample”, “Student”, HFactory.createStringColumn(“loginName”, “admin”));
            mutator.insert(“sample”, “Student”, HFactory.createStringColumn(“password”, “admin”));
            mutator.insert(“sample”, “Student”, HFactory.createStringColumn(“ddd”, “admin”));
            mutator.insert(“sample”, “Student”, HFactory.createStringColumn(“aa”, “admin”));
            System.out.println(“数据插入成功…”);
            System.out.println(); 我后面又找了这种代码做了个增删改查,但是感觉还是没您发布的CQL方式好,希望大师能够给个demo 跪谢了。

  8. 张霞说道:

    你好,第一次学这个,就是这个驱动包是怎么回事,怎么下载了。谢谢

留言

提示:你的email不会被公布,欢迎留言^_^

*

验证码 *