Chinaunix首页 | 论坛 | 博客
  • 博客访问: 183892
  • 博文数量: 29
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 601
  • 用 户 组: 普通用户
  • 注册时间: 2013-07-03 18:51
个人简介

大数据算法,分布式技术,spark技术爱好者

文章分类

全部博文(29)

文章存档

2015年(4)

2014年(3)

2013年(22)

分类: 云计算

2014-12-15 08:47:46

作为scala这种比较冷门的而语言,在公司里联合开发项目的时候会遇到很大的问题。如果能把java,python,scala三种模快结合在一起,并且可以调用彼此的函数,并支持原生对象的跨语言使用,那将大大的提高开发效率。

我是spark的强烈爱好者,当然更是有这样的需求。最近在工程实践中搞定了这个问题,发文大家共享:

scala和java的混编主要的问题是原生对象的转换。python的加入需要利用第三方库py4j:

技术解决方案:

a)       Pyton调用java原生对象及方法:

         经测试jpype对于java原生类型支持不好,而py4j基本支持所有java原生类型,所以采用py4j来实现pythonjvm的交互。

b)       Scala模块的运算结果转换成java原生对象:

Scala支持隐式转换,基本不需要编写专门转换的代码,就可以把scala原生类型转换成java原生类型,返回给python client

具体实现:
    

JavaServer端负责提供各种业务逻辑模块的接口,接受python clientrpc请求,自动把返回的java原生对象转换为python的原生类型。测试了arraylisthashmap等类型都能够很好地转换。

下面的代码把复杂的HashMap[String, ArrayList[String]]自动转换成了pythondict[String,list[String]]类型。
        

·JVMServer启动:

    val gatewayServer: GatewayServer = new GatewayServer( JvmServer)

      gatewayServer.start
    
    

·各种复杂的scala原生对象转换成java原生类型,方便python client读取使用:
    注意import和原生类型的混编,下面是我编写spark项目的一段代码:

  import scala.collection.JavaConversions._

  def joinTable(tableRDD:RDD[Array[String]]): util.HashMap[String, util.List[String]] =

  {

    val arr = new java.util.ArrayList[String]()

    val typedRDD = tableRDD.map(record => (record(0),record(1)))

    val joinedRdd = typedRDD.join(typedRDD)

    val retVal = new util.HashMap[String, util.List[String]]

    joinedRdd.collect.map(x => retVal(x._1) = Seq(x._2._1, x._2._2))

    retVal

   }

    

       Python Client实现(python语言):

·安装 py4jpip install py4j

·编写client

1.获得javaServer的连接

gateway = JavaGateway().entry_point

3.调用jvm serverreadtable方法得到表数据,存入rdd,并且返回rdd的引用:

tabRdd = gateway.readTable(accessId, accessKey, odpsUrl,tunnelUrl )

4.调用jvm serverjointable方法实现一个简单的rddjoin,并返回java原生的hashmap。为了说明对复杂原生类型的支持,这个hashmapvalue字段是一个arraylist

hashMap = gateway.joinTable(tabRdd)

5.打印hashMap,看到调用scala函数返回的结果已经被转换为python的dict原生类型,方便做后续的操作:



print hashMap

{u'610284856': [u'G', u'G'], u'746267708': [u'G', u'G'], u'388575266': [u'G', u'G'], u'133968148': [u'B', u'B'], u'858920471': [u'G', u'G'], u'1937490561': [u'B', u'B'], u'693821356': [u'G', u'G'], u'286352571': [u'G', u'G'], u'31548425': [u'G', u'G'], u'703276015': [u'G', u'G'], u'818248833': [u'G', u'G'], u'1096893684': [u'G', u'G'], u'135501126': [u'G', u'G'], u'1051499876': [u'B', u'B'], u'1092149024': [u'B', u'B'], u'416576560': [u'G', u'G'], u'367637452': [u'G', u'G'], u'734851900': [u'G', u'G'], u'843056883': [u'G', u'G'], u'129367981': [u'G', u'G'], u'140531506': [u'G', u'G'], u'806689117': [u'B', u'B'], u'1665017117': [u'G




    更多的对象类型转换:
scala.collection.Iterable <=> java.lang.Iterable
scala.collection.Iterable <=> java.util.Collection
scala.collection.Iterator <=> java.util.{ Iterator, Enumeration }
scala.collection.mutable.Buffer <=> java.util.List
scala.collection.mutable.Set <=> java.util.Set
scala.collection.mutable.Map <=> java.util.{ Map, Dictionary }
scala.collection.mutable.ConcurrentMap <=> java.util.concurrent.ConcurrentMap


scala.collection.Seq         => java.util.List
scala.collection.mutable.Seq => java.util.List
scala.collection.Set         => java.util.Set
scala.collection.Map         => java.util.Map
java.util.Properties         => scala.collection.mutable.Map[String, String]


    参考:

阅读(6562) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~