Fig流水线式开发框架

联合创作 · 2023-09-29 14:36

Fig(无花果)是一个根据多任务流水线模型开发的运行框架,框架多任务并发使用的java的线程池进行控制,使用队列实现任务间的数据传递

预览版 仓库 地址:https://oss.sonatype.org/content/repositories/snapshots
   maven 依赖 :

<dependency>
  <groupId>com.github.taomus.fig</groupId>
  <artifactId>fig-core</artifactId>
  <version>0.1.0-SNAPSHOT</version>
  <type>module</type>
</dependency>

<dependency>
  <groupId>com.github.taomus.fig</groupId>
  <artifactId>fig-spring-plugin</artifactId>
  <version>0.1.0-SNAPSHOT</version>
  <type>module</type>
</dependency>

 Xtend 代码实例:           

package com.github.test1.stock

import com.github.taomus.fig.core.engine.Data
import com.github.taomus.fig.core.engine.Fig
import com.github.taomus.fig.core.engine.FigEngine
import com.github.taomus.fig.core.engine.Task
import com.github.test.stock.entity.StockData
import java.nio.charset.Charset
import java.util.Arrays
import java.util.Vector
import joinery.DataFrame
import org.apache.commons.lang3.SerializationUtils
import org.jsoup.Connection
import org.jsoup.Jsoup
import org.slf4j.LoggerFactory
import org.springframework.util.StreamUtils

class CollectStock1 extends Fig {
	val static LOG = LoggerFactory.getLogger(CollectStock1)
	
	def static void main(String[] args){
		FigEngine.instance.addModule(CollectStock1)
		FigEngine.instance.run()
		FigEngine.instance.taskQueue.put(Data.create("start",null))
	}
	
	def String getUrl(String code,String date) {
		if (code.startsWith("0")) {
			return ''' http://quotes.money.163.com/service/chddata.html?code=1«code»&start=«date»0101&end=«date»1231
			'''
		} else {
			return ''' http://quotes.money.163.com/service/chddata.html?code=0«code»&start=«date»0101&end=«date»1231
			'''
		}
	}

	@Task("start")
	def Data[] getStockCodes(Data value){
		return #[
			Data.create("buildData",new StockData("平安银行","000001")),
			Data.create("buildData",new StockData("万科A","000002"))
		]
	}
	
	@Task("buildData")
	def Data[] collect(Data value){
		var stockinfo = value.getData as StockData
		var results = new Vector<Data>();
		for(date:2000..2020){
			var info = SerializationUtils.<StockData>clone(stockinfo)
			info.date = String.valueOf(date)
			results.add(Data.create("stock_history",info))
		}
		return results
	}

	@Task("stock_history")
	def Data startHistory(Data value) {
		var d = value.getData() as StockData
		var String url = getUrl(d.code,d.date)
		LOG.info(url)
		var Connection connection = Jsoup.connect(url);
		var Connection.Response response = connection.method(Connection.Method.GET).ignoreContentType(true).timeout(10 *
			1000).execute();
		var a = StreamUtils.copyToString(response.bodyStream, Charset.forName("UTF-8"))
		LOG.info(a)
		var data = a.split("\n")
		if(data.size == 1){
			return null
		}
		var DataFrame<Object> df = new DataFrame("日期", "股票代码", "名称", "收盘价", "最高价", "最低价", "开盘价", "前收盘", "涨跌额", "涨跌幅",
			"换手率", "成交量", "成交金额", "总市值", "流通市值", "成交笔数");
		for(index : 1..data.length-1){
			df.append(Arrays.asList(data.get(index).split(',')))
		}
		var results = new Vector();
		var indexs = df.index();
		for (Object index : indexs) {
			var sd = new StockData()
			sd.date = df.get(index,"日期") as String
			sd.code = df.get(index,"股票代码") as String
			sd.code = sd.code.substring(1)
			sd.name = d.name
			sd.opening = Float.valueOf(df.get(index,"开盘价") as String)
			sd.ending = Float.valueOf(df.get(index,"收盘价") as String)
			sd.low = Float.valueOf(df.get(index,"最低价") as String)
			sd.hig = Float.valueOf(df.get(index,"最高价") as String)
			results.add(sd)
		}
		var resdata = Data.create("calc",results.reverse);
		resdata.priority = 2
		return resdata
	}
	
	@Task("calc")
	def Data calc(Data value) {
		return null;
	}	
}

 

浏览 6
点赞
评论
收藏
分享

手机扫一扫分享

编辑 分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

编辑 分享
举报