博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
FunDA(16)- 示范:整合并行运算 - total parallelism solution
阅读量:4349 次
发布时间:2019-06-07

本文共 13715 字,大约阅读时间需要 45 分钟。

   在对上两篇讨论中我们介绍了并行运算的两种体现方式:并行构建数据源及并行运算用户自定义函数。我们分别对这两部分进行了示范。本篇我准备示范把这两种情况集成一体的并行运算模式。这次介绍的数据源并行构建方式也与前面描述的有所不同:在前面讨论里我们预知需要从三个独立流来并行构建数据源。但如果我们有一个不知长度的数据流,它的每个元素代表不同的数据流,应该如何处理。我们知道在AQMRPT表里有从1999年到2xxx年的空气质量测量数据,我们可以试着并行把按年份生成的数据流构建成一个数据源。直接使用上期示范中的铺垫代码包括NORMAQM表初始化和从STATES和COUNTIES里用名称搜索对应id的函数:

val db = Database.forConfig("h2db")  //drop original table schema  val futVectorTables = db.run(MTable.getTables)  val futDropTable = futVectorTables.flatMap{ tables => {    val tableNames = tables.map(t => t.name.name)    if (tableNames.contains(NORMAQMQuery.baseTableRow.tableName))      db.run(NORMAQMQuery.schema.drop)    else Future()  }  }.andThen {    case Success(_) => println(s"Table ${NORMAQMQuery.baseTableRow.tableName} dropped successfully! ")    case Failure(e) => println(s"Failed to drop Table ${NORMAQMQuery.baseTableRow.tableName}, it may not exist! Error: ${e.getMessage}")  }  Await.ready(futDropTable,Duration.Inf)  //create new table to refine AQMRawTable  val actionCreateTable = Models.NORMAQMQuery.schema.create  val futCreateTable = db.run(actionCreateTable).andThen {    case Success(_) => println("Table created successfully!")    case Failure(e) => println(s"Table may exist already! Error: ${e.getMessage}")  }  //would carry on even fail to create table  Await.ready(futCreateTable,Duration.Inf)  //truncate data, only available in slick 3.2.1  val futTruncateTable = futVectorTables.flatMap{ tables => {    val tableNames = tables.map(t => t.name.name)    if (tableNames.contains(NORMAQMQuery.baseTableRow.tableName))      db.run(NORMAQMQuery.schema.truncate)    else Future()  }  }.andThen {    case Success(_) => println(s"Table ${NORMAQMQuery.baseTableRow.tableName} truncated successfully!")    case Failure(e) => println(s"Failed to truncate Table ${NORMAQMQuery.baseTableRow.tableName}! Error: ${e.getMessage}")  }  Await.ready(futDropTable,Duration.Inf)  //a conceived task for the purpose of resource consumption  //getting id with corresponding name from STATES table  def getStateID(state: String): Int = {    //create a stream for state id with state name    implicit def toState(row:  StateTable#TableElementType) = StateModel(row.id,row.name)    val stateLoader = FDAViewLoader(slick.jdbc.H2Profile)(toState _)    val stateSeq = stateLoader.fda_typedRows(StateQuery.result)(db).toSeq    //constructed a Stream[Task,String]    val stateStream =  fda_staticSource(stateSeq)()    var id  = -1    def getid: FDAUserTask[FDAROW] = row => {      row match {        case StateModel(stid,stname) =>   //target row type          if (stname.contains(state)) {            id = stid            fda_break      //exit          }          else fda_skip   //take next row        case _ => fda_skip      }    }    stateStream.appendTask(getid).startRun    id  }  //another conceived task for the purpose of resource consumption  //getting id with corresponding names from COUNTIES table  def getCountyID(state: String, county: String): Int = {    //create a stream for county id with state name and county name    implicit def toCounty(row:  CountyTable#TableElementType) = CountyModel(row.id,row.name)    val countyLoader = FDAViewLoader(slick.jdbc.H2Profile)(toCounty _)    val countySeq = countyLoader.fda_typedRows(CountyQuery.result)(db).toSeq    //constructed a Stream[Task,String]    val countyStream =  fda_staticSource(countySeq)()    var id  = -1    def getid: FDAUserTask[FDAROW] = row => {      row match {        case CountyModel(cid,cname) =>   //target row type          if (cname.contains(state) && cname.contains(county)) {            id = cid            fda_break      //exit          }          else fda_skip   //take next row        case _ => fda_skip      }    }    countyStream.appendTask(getid).startRun    id  }

以及两个用户自定义函数:

//process input row and produce action row to insert into NORMAQM  def getIdsThenInsertAction: FDAUserTask[FDAROW] = row => {    row match {      case aqm: AQMRPTModel =>        if (aqm.valid) {          val stateId = getStateID(aqm.state)          val countyId = getCountyID(aqm.state,aqm.county)          val action = NORMAQMQuery += NORMAQMModel(0,aqm.mid, stateId, countyId, aqm.year,aqm.value,aqm.total)          fda_next(FDAActionRow(action))        }        else fda_skip      case _ => fda_skip    }  }  //runner for the action rows  val runner = FDAActionRunner(slick.jdbc.H2Profile)  def runInsertAction: FDAUserTask[FDAROW] = row =>    row match {      case FDAActionRow(action) =>        runner.fda_execAction(action)(db)        fda_skip      case _ => fda_skip    }

跟着是本篇新增代码,我们先构建一个所有年份的流:

//create parallel sources  //get a stream of years  val qryYears = AQMRPTQuery.map(_.year).distinct  case class Years(year: Int) extends FDAROW  implicit def toYears(y: Int) = Years(y)  val yearViewLoader = FDAViewLoader(slick.jdbc.H2Profile)(toYears _)  val yearSeq = yearViewLoader.fda_typedRows(qryYears.result)(db).toSeq  val yearStream = fda_staticSource(yearSeq)()

下面是一个按年份从AQMRPT表读取数据的函数:

//strong row type  implicit def toAQMRPT(row: AQMRPTTable#TableElementType) =    AQMRPTModel(row.rid, row.mid, row.state, row.county, row.year, row.value, row.total, row.valid)  //shared stream loader when operate in parallel mode  val AQMRPTLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toAQMRPT _)  //loading rows with year yr  def loadRowsInYear(yr: Int) = {    //a new query    val query = AQMRPTQuery.filter(row => row.year === yr)    //reuse same loader    AQMRPTLoader.fda_typedStream(query.result)(db)(256, 256)()  }

我们可以预见多个loadRowsInYear函数实例会共享统一的FDAStreamLoader AQMRPTLoader。用户自定义数据读取函数类型是FDASourceLoader。下面是FDASourceLoader示范代码:

//loading rows by year  def loadRowsByYear: FDASourceLoader = row => {    row match {      case Years(y) => loadRowsInYear(y) //produce stream of the year      case _ => fda_appendRow(FDANullRow)    }  }

我们用toParSource构建一个并行数据源:

//get parallel source constructor  val parSource = yearStream.toParSource(loadRowsByYear)

用fda_par_source来把并行数据源转换成统一数据流:

//produce a stream from parallel sources  val source = fda_par_source(parSource)(3)

source是个FDAPipeLine,可以直接运算:source.startRun,也可以在后面挂上多个环节。下面我们把其它两个用户自定义函数转成并行运算函数后接到source后面:

//the following is a process of composition of stream combinators  //get parallel source constructor  val parSource = yearStream.toParSource(loadRowsByYear)  //implicit val strategy = Strategy.fromCachedDaemonPool("cachedPool")  //produce a stream from parallel sources  val source = fda_par_source(parSource)(3)  //turn getIdsThenInsertAction into parallel task  val parTasks = source.toPar(getIdsThenInsertAction)  //runPar to produce a new stream  val actionStream =fda_runPar(parTasks)(3)  //turn runInsertAction into parallel task  val parRun = actionStream.toPar(runInsertAction)  //runPar and carry out by startRun  fda_runPar(parRun)(2).startRun

下面是本次示范的完整源代码: 

import slick.jdbc.meta._import com.bayakala.funda._import api._import scala.language.implicitConversionsimport scala.concurrent.ExecutionContext.Implicits.globalimport scala.concurrent.duration._import scala.concurrent.{Await, Future}import scala.util.{Failure, Success}import slick.jdbc.H2Profile.api._import Models._import fs2.Strategyobject ParallelExecution extends App {  val db = Database.forConfig("h2db")  //drop original table schema  val futVectorTables = db.run(MTable.getTables)  val futDropTable = futVectorTables.flatMap{ tables => {    val tableNames = tables.map(t => t.name.name)    if (tableNames.contains(NORMAQMQuery.baseTableRow.tableName))      db.run(NORMAQMQuery.schema.drop)    else Future()  }  }.andThen {    case Success(_) => println(s"Table ${NORMAQMQuery.baseTableRow.tableName} dropped successfully! ")    case Failure(e) => println(s"Failed to drop Table ${NORMAQMQuery.baseTableRow.tableName}, it may not exist! Error: ${e.getMessage}")  }  Await.ready(futDropTable,Duration.Inf)  //create new table to refine AQMRawTable  val actionCreateTable = Models.NORMAQMQuery.schema.create  val futCreateTable = db.run(actionCreateTable).andThen {    case Success(_) => println("Table created successfully!")    case Failure(e) => println(s"Table may exist already! Error: ${e.getMessage}")  }  //would carry on even fail to create table  Await.ready(futCreateTable,Duration.Inf)  //truncate data, only available in slick 3.2.1  val futTruncateTable = futVectorTables.flatMap{ tables => {    val tableNames = tables.map(t => t.name.name)    if (tableNames.contains(NORMAQMQuery.baseTableRow.tableName))      db.run(NORMAQMQuery.schema.truncate)    else Future()  }  }.andThen {    case Success(_) => println(s"Table ${NORMAQMQuery.baseTableRow.tableName} truncated successfully!")    case Failure(e) => println(s"Failed to truncate Table ${NORMAQMQuery.baseTableRow.tableName}! Error: ${e.getMessage}")  }  Await.ready(futDropTable,Duration.Inf)  //a conceived task for the purpose of resource consumption  //getting id with corresponding name from STATES table  def getStateID(state: String): Int = {    //create a stream for state id with state name    implicit def toState(row:  StateTable#TableElementType) = StateModel(row.id,row.name)    val stateLoader = FDAViewLoader(slick.jdbc.H2Profile)(toState _)    val stateSeq = stateLoader.fda_typedRows(StateQuery.result)(db).toSeq    //constructed a Stream[Task,String]    val stateStream =  fda_staticSource(stateSeq)()    var id  = -1    def getid: FDAUserTask[FDAROW] = row => {      row match {        case StateModel(stid,stname) =>   //target row type          if (stname.contains(state)) {            id = stid            fda_break      //exit          }          else fda_skip   //take next row        case _ => fda_skip      }    }    stateStream.appendTask(getid).startRun    id  }  //another conceived task for the purpose of resource consumption  //getting id with corresponding names from COUNTIES table  def getCountyID(state: String, county: String): Int = {    //create a stream for county id with state name and county name    implicit def toCounty(row:  CountyTable#TableElementType) = CountyModel(row.id,row.name)    val countyLoader = FDAViewLoader(slick.jdbc.H2Profile)(toCounty _)    val countySeq = countyLoader.fda_typedRows(CountyQuery.result)(db).toSeq    //constructed a Stream[Task,String]    val countyStream =  fda_staticSource(countySeq)()    var id  = -1    def getid: FDAUserTask[FDAROW] = row => {      row match {        case CountyModel(cid,cname) =>   //target row type          if (cname.contains(state) && cname.contains(county)) {            id = cid            fda_break      //exit          }          else fda_skip   //take next row        case _ => fda_skip      }    }    countyStream.appendTask(getid).startRun    id  }  //process input row and produce action row to insert into NORMAQM  def getIdsThenInsertAction: FDAUserTask[FDAROW] = row => {    row match {      case aqm: AQMRPTModel =>        if (aqm.valid) {          val stateId = getStateID(aqm.state)          val countyId = getCountyID(aqm.state,aqm.county)          val action = NORMAQMQuery += NORMAQMModel(0,aqm.mid, stateId, countyId, aqm.year,aqm.value,aqm.total)          fda_next(FDAActionRow(action))        }        else fda_skip      case _ => fda_skip    }  }  //runner for the action rows  val runner = FDAActionRunner(slick.jdbc.H2Profile)  def runInsertAction: FDAUserTask[FDAROW] = row =>    row match {      case FDAActionRow(action) =>        runner.fda_execAction(action)(db)        fda_skip      case _ => fda_skip    }  //create parallel sources  //get a stream of years  val qryYears = AQMRPTQuery.map(_.year).distinct  case class Years(year: Int) extends FDAROW  implicit def toYears(y: Int) = Years(y)  val yearViewLoader = FDAViewLoader(slick.jdbc.H2Profile)(toYears _)  val yearSeq = yearViewLoader.fda_typedRows(qryYears.result)(db).toSeq  val yearStream = fda_staticSource(yearSeq)()  //strong row type  implicit def toAQMRPT(row: AQMRPTTable#TableElementType) =    AQMRPTModel(row.rid, row.mid, row.state, row.county, row.year, row.value, row.total, row.valid)  //shared stream loader when operate in parallel mode  val AQMRPTLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toAQMRPT _)  //loading rows with year yr  def loadRowsInYear(yr: Int) = {    //a new query    val query = AQMRPTQuery.filter(row => row.year === yr)    //reuse same loader    AQMRPTLoader.fda_typedStream(query.result)(db)(256, 256)()  }  //loading rows by year  def loadRowsByYear: FDASourceLoader = row => {    row match {      case Years(y) => loadRowsInYear(y) //produce stream of the year      case _ => fda_appendRow(FDANullRow)    }  }  //start counter  val cnt_start = System.currentTimeMillis()  def showRecord: FDAUserTask[FDAROW] = row => {    row match {      case Years(y) => println(y); fda_skip      case aqm: AQMRPTModel =>        println(s"${aqm.year}  $aqm")        fda_skip      case FDAActionRow(action) =>        println(s"${action}")        fda_skip      case _ => fda_skip    }  }  //the following is a process of composition of stream combinators  //get parallel source constructor  val parSource = yearStream.toParSource(loadRowsByYear)  //implicit val strategy = Strategy.fromCachedDaemonPool("cachedPool")  //produce a stream from parallel sources  val source = fda_par_source(parSource)(3)  //turn getIdsThenInsertAction into parallel task  val parTasks = source.toPar(getIdsThenInsertAction)  //runPar to produce a new stream  val actionStream =fda_runPar(parTasks)(3)  //turn runInsertAction into parallel task  val parRun = actionStream.toPar(runInsertAction)  //runPar and carry out by startRun  fda_runPar(parRun)(2).startRun  println(s"processing 219400 rows parallelly  in ${(System.currentTimeMillis - cnt_start)/1000} seconds")}

 

 

 

 

 

 

 

 

 

转载于:https://www.cnblogs.com/tiger-xc/p/6652563.html

你可能感兴趣的文章
敏捷开发中软件测试团队的职责和产出是什么?
查看>>
在mvc3中使用ffmpeg对上传视频进行截图和转换格式
查看>>
python的字符串内建函数
查看>>
Spring - DI
查看>>
微软自己的官网介绍 SSL 参数相关
查看>>
Composite UI Application Block (CAB) 概念和术语
查看>>
ajax跨域,携带cookie
查看>>
阶段3 2.Spring_02.程序间耦合_7 分析工厂模式中的问题并改造
查看>>
阶段3 2.Spring_03.Spring的 IOC 和 DI_2 spring中的Ioc前期准备
查看>>
阶段3 2.Spring_03.Spring的 IOC 和 DI_6 spring中bean的细节之三种创建Bean对象的方式
查看>>
阶段3 2.Spring_04.Spring的常用注解_2 常用IOC注解按照作用分类
查看>>
阶段3 2.Spring_09.JdbcTemplate的基本使用_5 JdbcTemplate在spring的ioc中使用
查看>>
小D课堂 - 零基础入门SpringBoot2.X到实战_第11节 Logback日志框架介绍和SpringBoot整合实战_45、SpringBoot2.x日志讲解和Logback配置实战...
查看>>
小D课堂 - 新版本微服务springcloud+Docker教程_4-05 微服务调用方式之feign 实战 订单调用商品服务...
查看>>
UI基础--烟花动画
查看>>
Android dex分包方案
查看>>
ThreadLocal为什么要用WeakReference
查看>>
删除本地文件
查看>>
FOC实现概述
查看>>
gethostbyname与sockaddr_in的完美组合
查看>>