欢迎来到Doc100.Net免费学习资源知识分享平台!
您的位置:首页 > 程序异常 >

数据仓库etl调度工具的开发(1)

更新时间: 2014-01-05 02:28:42 责任编辑: Author_N1

 

数据仓库ETL调度工具的开发(一)

目标

实现一款简单的ETL调度工具,支持几个关键功能:
1、作业依赖关系的配置

一个作业可能依赖于多个作业,同时也可以被多个作业依赖,前面所依赖的作业都执行成功后才能执行后面的作业

2、作业优先级的控制

可能有很多相互之间没有依赖关系的作业,哪个优先执行,需要控制

3、作业并发数的控制
控制同时运行作业的最大个数

4、作业相关的元数据

 存储在数据库,比如作业的基本信息:id,作业名称,作业类型,参数值,优化级。作业所在服务器相关信息,作业与作业依赖关系表等等

实现步骤

提供可视化界面维护作业相关的元数据,调度工具从元数据库中读取作业相关信息截入内存,大体实现步骤:

1、作业初始化
将元数据库无任何依赖关系的作业读入作业队列jobqueue,jobqueue为ArrayList类

2、作业依赖关系的控制
通过Map类分别构建Map1存储一个作业id,依赖于哪些作业id,Map2存储一个作业id,被哪些作业id依赖, 一个作业执行成功后,通过Map2通知相关的作业做检查,
借助Map1检索到其子作业,看是否都成功

3、作业优先级的控制
每对jobqueue add一次,重新对jobqueue排序一次,借助Collections.sort,按优先级,提交时间排序

4、作业并发数的控制
作业的Thread类内,声明volatile型变量代表正运行作业的个数,作业运行前以及作运完后,加锁修改

 

源码

以下为控制远程shell脚本作业的Java实现源码,其他类型的作业可直接扩展相应的方法:

 作业类

public class JobInfo {
int priority;
long starttime;
int jobid;
public JobInfo(int priority,long starttime,int jobid)
{
	this.priority=priority;
	this.starttime=starttime;
	this.jobid=jobid;
}

public int getPriority()
{
	return priority;
	
}

public long getStarttime()
{
	return starttime;
}
public String toString()
{
	return String.valueOf(jobid);
}
}

 

主类

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;


/*
 * 调度程序
 * 无前置关系的作业初始化的时候加入队列
 * 构建两个哈希表,一个存储每个作业被哪些作业依赖,便于该作业执行完后通知那些作业去检查是否可开始加入队列了
 * 另一个存储每个作业依赖哪些作业,用来查找对应的子作业的状态是否全都OK了
 * 如何控制作业啥时候退出,用作业队列和为S作业个数,来判断,如果连续循环多少次后一直是这两个条件满足,x变量,都退出
 * 只要中间有一次不连续,做了其他的,比如从队列取了作业,正在并发的>=5,都加x赋0
 */

public class EtlControl {
	//Queue <Integer> qe;
	ArrayList<JobInfo> jobqueue;
	Map<Integer,ArrayList> refmap;
	Map<Integer,ArrayList> refedmap;
	Map<Integer,Integer> stautsmap;
	Map<Integer,ArrayList<String>> sourcemap; 
	Map<Integer,ArrayList<String>> jobinfomap;//job基本信息
	Map<Integer,ArrayList<String>> jobparmap;//job参数信息
	@SuppressWarnings("rawtypes")
	public EtlControl(ArrayList<JobInfo> jobqueue,Map<Integer,ArrayList> refmap,Map<Integer,ArrayList> refedmap,Map<Integer,Integer> stautsmap,Map<Integer,ArrayList<String>> sourcemap
		,	Map<Integer,ArrayList<String>> jobinfomap,Map<Integer,ArrayList<String>> jobparmap)
	{
		this.jobqueue=jobqueue;
		this.refedmap=refedmap;
		this.refmap=refmap;
		this.stautsmap=stautsmap;
		this.sourcemap=sourcemap;
		this.jobinfomap=jobinfomap;
		this.jobparmap=jobparmap;
		
	}
	
	public void init()
	{
		DBUtil dbutil=new DBUtil();
		Statement pstmt=null;
		Connection conn=null;
		ResultSet rs=null;
		ResultSet rs2=null;
		ResultSet rs3=null;
		ResultSet rsjobinfo=null;
		ResultSet rsjobpar=null;
		PreparedStatement ps=null;
		String sql="select jobid,priority from etlcontrol_job_info a where not exists (select 1  from etlcontrol  b where a.jobid=b.thisjob)";//不依赖任何作业的作业
		String sql2="select  jobid from etlcontrol_job_info  "; //所有的作业
		String sqlref="select refjob from  etlcontrol where thisjob=?";//这个作业依赖于哪些作业
		String sqlrefed="select thisjob from  etlcontrol where refjob=?";//这个作业哪些些作业依赖
		String sqlsource="select source_id,ip,dir,username,password1,nvl(db_sid,'NA'),nvl(port,'NA'),nvl(logdir,'NA') from  etlcontrol_job_source";
		String sqljobinfo="select jobid,jobname,kind,source_id,priority from etlcontrol_job_info"; //作业基本信息
		String sqljobpar="select param_value from etlcontrol_job_paramvalue where jobid=?  order by seq";//作业参数
		String sqljobpar0="select distinct jobid from etlcontrol_job_paramvalue";//作业参数
		
		try
		{
			conn=dbutil.getConnection("oracle");
			pstmt=conn.createStatement();
            rs=pstmt.executeQuery(sql);
        // 初始化作业队列,无依赖关系的
            while (rs.next())
            {
            	//System.out.println();
            	//qe.offer(rs.getInt(1));
            	int jobid=rs.getInt(1);
            	int priority=rs.getInt(1);
            	long starttime=new Date().getTime();
            	jobqueue.add(new JobInfo(priority,starttime,jobid));
            }
            rs=pstmt.executeQuery(sql2);
            //初始化一个job,循环job, 计算出依赖于哪些作业,以及被哪些作业依赖,分别存在相应的map
            while(rs.next())
            {
            
            	int thisjob=rs.getInt(1);
            
            	ps=conn.prepareStatement(sqlref);
            	ps.setInt(1, thisjob);
           	rs2=ps.executeQuery();
            	ArrayList<Integer> reflist=new ArrayList<Integer>();
            	
            	while (rs2.next())
            	{
            		
            		 if (rs2.getInt(1)>0) //不依赖任何作业的作业不写入依赖的哈希表
            		reflist.add(rs2.getInt(1));
            	}
            	if (reflist.size()>0)
            	refmap.put(thisjob, reflist);
            	
              	ps=conn.prepareStatement(sqlrefed);
            	ps.setInt(1, thisjob);
            	
           	rs2=ps.executeQuery();
            	ArrayList<Integer> refedlist=new ArrayList<Integer>();
            	
            	while (rs2.next())
            	{
            		refedlist.add(rs2.getInt(1));
            	}
            	if (refedlist.size()>0)
            	refedmap.put(thisjob, refedlist);
            
            
            }//双层循环结束
            
            
            
            //将有参数的作业参数值写入内存
            rs=pstmt.executeQuery(sqljobpar0);
            while(rs.next())
            {
            	int thisjob=rs.getInt(1);
            	System.out.println("将有参数的作业参数值写入内存::"+thisjob);
            	ps=conn.prepareStatement(sqljobpar);
            	ps.setInt(1, thisjob);
           	rsjobpar=ps.executeQuery();
            	ArrayList<String> rsjobparlist=new ArrayList<String>();
            	
            	while (rsjobpar.next())
            	{
            		
            		
            			 rsjobparlist.add(rsjobpar.getString(1));
            	}
            	
            	jobparmap.put(thisjob, rsjobparlist);
            }
            
            
            
        	//将source服务器信息写入到内存
     	   rs3=pstmt.executeQuery(sqlsource);
     	   //source_id,ip,dir,username,password1,nvl(db_sid,'NA'),nvl(port,'NA')
     	   while (rs3.next())
     	   {

            	//存储数据源,id,ip,目录,用户名,密码等信息
            	ArrayList<String> sourcelist=new ArrayList<String>();
     		   sourcelist.add(rs3.getString(2));//ip
     		   sourcelist.add(rs3.getString(3));//dir
     		   sourcelist.add(rs3.getString(4));//username
     		   sourcelist.add(rs3.getString(5));//password
     		  sourcelist.add(rs3.getString(6));
     		 sourcelist.add(rs3.getString(7)); //port
     		 sourcelist.add(rs3.getString(8)); //logdir
     		   sourcemap.put(rs3.getInt(1), sourcelist);
     		   
         		   
     		   
     	   }
            
     	   
       	//将每个作业的基本信息写入内存
     	   rsjobinfo=pstmt.executeQuery(sqljobinfo);
     	   while (rsjobinfo.next())
     	   {

            	//存储job名字,类型等等
            	ArrayList<String> jobinfolist=new ArrayList<String>();
            	jobinfolist.add(rsjobinfo.getString(2));//jobname
            	jobinfolist.add(rsjobinfo.getString(3));//kind
            	jobinfolist.add(rsjobinfo.getString(4));//source
            	jobinfolist.add(rsjobinfo.getString(5));//priority
     		 
     		   jobinfomap.put(rsjobinfo.getInt(1), jobinfolist);
     	   }
     	   
     	   
     	   
     	   
            
            conn.close();
            rs.close();
            rs2.close();
            rs3.close();
            rsjobinfo.close();
            rsjobpar.close();
	}
		
	catch(Exception e)
	{
		e.printStackTrace();
	}
		
	}
	
	public void start() throws InterruptedException
	{ 
	  int x=0;//控制作业什么时候退出
	  int c=0;
		while (true)
		{
			synchronized (jobqueue) {
			if (jobqueue.isEmpty())
			{ //作业队列为空,正在运行的作业个数为0的时候 x++
				if (c==0)
				{
					x++;
				}
				jobqueue.wait(6000);
				
			}
			}
			
			c=LaunchJob.scnt;  //状态为S的作业个数
			 System.out.println("status is S:"+c);
			 
			 if (c<5 )  //最大并发数不超过5
			 {
		//	Integer job=qe.poll();//从队列里取作业,如果为NULL,此处用int会报错
				 JobInfo jobinfo=null;
				 if (!jobqueue.isEmpty())
				 {
				  jobinfo=jobqueue.remove(0);
				 
				 }
			 
			if (jobinfo!=null)
				
				{
				int job=jobinfo.jobid;
				if (job>0)
				{	
				x=0;
				LaunchJob lj=new LaunchJob(job,refmap,refedmap,stautsmap, jobqueue,sourcemap,jobinfomap,jobparmap);
				lj.start();
				}	
				}
			 }
		//大于并发数5,则休息3s等作业运行完
			 else 
			 {
				 x=0;
					Thread.sleep(3000);
			 }
			
			if (x>20)//20次循环都未发生有在运行的作业,则退出主线程
			{
			System.out.println("exit");
			break;
			}
			
			/*
			if (c>1)
				{
				System.out.println("exit");
				break;
				}
			*/
		
			
		}
	}
	
	
		
	
	public static void main(String[] args) throws InterruptedException
	{ //Queue <Integer> qe=new LinkedList<Integer>();
	ArrayList<JobInfo> jobqueue =new ArrayList<JobInfo>();
	   Map<Integer,ArrayList> refmap=new ConcurrentHashMap<Integer,ArrayList>();
	   Map<Integer,ArrayList> refedmap=new ConcurrentHashMap<Integer,ArrayList>();
	   Map<Integer,Integer> stautsmap=new ConcurrentHashMap<Integer,Integer>();
	   Map<Integer,ArrayList<String>> sourcemap=new ConcurrentHashMap<Integer,ArrayList<String>>();
	   Map<Integer,ArrayList<String>> jobinfomap=new ConcurrentHashMap<Integer,ArrayList<String>>();
	   Map<Integer,ArrayList<String>> jobparmap=new ConcurrentHashMap<Integer,ArrayList<String>>();

	 //System.out.println(qe.poll());
		EtlControl ec=new EtlControl(jobqueue,refmap,refedmap,stautsmap,sourcemap,jobinfomap,jobparmap);
		ec.init();
		System.out.println("refmap: "+refmap.toString());
		System.out.println("refedmap: " +refedmap.toString());
		System.out.println("jobqueue.toString(): "+jobqueue.toString());
		System.out.println("sourcemap: "+sourcemap.toString());
		System.out.println("jobinfomap: "+jobinfomap.toString());
		System.out.println("jobparmap: "+jobparmap.toString());
		ec.start();
		System.out.println(stautsmap.toString());
	}

}


作业线程类

import java.io.BufferedReader; 
import java.io.IOException; 
import java.io.InputStream; 
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;


import ch.ethz.ssh2.ChannelCondition;
import ch.ethz.ssh2.Connection; 
import ch.ethz.ssh2.Session; 
import ch.ethz.ssh2.StreamGobbler;


public class LaunchJob extends Thread

{
	private static final long TIME_OUT = 100;
	public static volatile int scnt=0;  //正在运行作业的个数
	int job;
	private int jobstatus=9; //代表运行中
	Map<Integer,ArrayList> refmap;
	Map<Integer,ArrayList> refedmap;
	Map<Integer,Integer> stautsmap;
	Map<Integer,ArrayList<String>> sourcemap;//服务器源信息
	Map<Integer,ArrayList<String>> jobinfomap; //作业基本信息
//	Queue <Integer> qe;
	ArrayList<JobInfo> jobqueue;
	Map<Integer,ArrayList<String>> jobparmap;//job参数信息
	public LaunchJob(int job,Map<Integer,ArrayList> refmap,Map<Integer,ArrayList> refedmap,Map<Integer,Integer> stautsmap,ArrayList<JobInfo> jobqueue,Map<Integer,ArrayList<String>> sourcemap,
			Map<Integer,ArrayList<String>> jobinfomap,Map<Integer,ArrayList<String>> jobparmap)
	{
		this.job=job;
		this.refmap=refmap;
		this.refedmap=refedmap;
		this.stautsmap=stautsmap;
		this.jobqueue=jobqueue;
		this.sourcemap=sourcemap;
		this.jobinfomap=jobinfomap;
		this.jobparmap=jobparmap;
	}
	public void setJobstatus(int stauts)
	{
		this.jobstatus=stauts;
	}
 public int getJobstatus()
 {
	 return jobstatus;
 } 
 public static synchronized void incrementScnt()
 {
	 scnt++;
 }
 
 public static synchronized void reduceScnt()
 {
	 scnt--;
 }
 

 public  void run()

 {  //通过传过来的jobid 找到jobname job类型,source_id再去sourcemap关联到ip,目录信息
	 ArrayList<String> jobinfolist=jobinfomap.get(job);
	// String jobname=jobinfomap.get(job).get(0);
	 //String jobkind=jobinfomap.get(job).get(1);
	 String jobname=jobinfolist.get(0);
	 String jobkind=jobinfolist.get(1);
	 int source_id=Integer.parseInt(jobinfolist.get(2));
	 //int priority=Integer.parseInt(jobinfolist.get(3));
	 
	 

	 
   if(jobkind.equals("shell")||jobkind.equals("mr"))
	 execShellOrMr(jobname,source_id);
 }
 
 public void execProc()
 {
	 
 }
 
 public void execMr()
 {
	 
 }
 public void execShellOrMr(String jobname,int source_id)
 {
	// /正在运行状态
		stautsmap.put(job, getJobstatus());
		
	//String hostname = "10.207.0.22";

	//String username = "oracle";

	//String password = "ora123!@#";
		
		//通过source_id去查找服务器ip,用户,密码等
		String hostname = sourcemap.get(source_id).get(0);
		String username=sourcemap.get(source_id).get(2);
		String password=sourcemap.get(source_id).get(3);
		String dirlog=sourcemap.get(source_id).get(6)+"/"+jobname.replace(" ", "_")+"_error.log";
		String dir=sourcemap.get(source_id).get(1);
		
		
		
		
		StringBuilder sbcommand=new StringBuilder("sh ").append(dir).append("/").append(jobname);
	
			
			if (jobparmap.containsKey(job))
			{
				//通过job的id号去找参数
				 ArrayList<String> jobparlist=jobparmap.get(job);
				 Iterator<String> it =jobparlist.iterator();
				 while (it.hasNext())
				 {
					 sbcommand.append(" ").append(it.next());
				 }
				
			}
			
		System.out.println(sbcommand.toString());
		
		

	try

	{

	/* Create a connection instance */

	Connection conn = new Connection(hostname);

	/* Now connect */

	conn.connect();

	/* Authenticate */

	boolean isAuthenticated = conn.authenticateWithPassword(username, password);

	if (isAuthenticated == false)

	throw new IOException("Authentication failed.");

	/* Create a session */

	Session sess = conn.openSession();

	//sess.execCommand("uname -a && date && uptime && who");

	incrementScnt(); //状态为正在运行的数量加1

	//sess.execCommand("sh "+job);
	System.out.println(sbcommand.toString());
	sess.execCommand(sbcommand.toString()+" 2>"+dirlog);


	System.out.println("Here is some information about the remote host:");

	InputStream stdout = new StreamGobbler(sess.getStdout());

	BufferedReader br = new BufferedReader(new InputStreamReader(stdout));

	while (true)

	{

	String line = br.readLine();

	if (line == null)

	break;

	System.out.println(line);

	}

	/* Show exit status, if available (otherwise "null") */

	sess.waitForCondition(ChannelCondition.EXIT_STATUS, TIME_OUT);

	reduceScnt(); //状态为正在运行的数量减1

	setJobstatus(sess.getExitStatus());

	System.out.println("ExitCode: " + getJobstatus());

	//运行状态写入哈希表
	stautsmap.put(job, new Integer(getJobstatus()));



	checkAndJoinQue();
	
	/* Close this session */

	sess.close();

	/* Close the connection */

	conn.close();

	}

	catch (IOException e)

	{

	e.printStackTrace(System.err); System.exit(2);

	}
 }

 public void checkAndJoinQue()
 {

		//如果运行成功,检查其他待运行作业,加入队列
		if (getJobstatus()==0)
		{//如果运行成功,检查依赖于该作业的一批作业是否应该加入待运行队列
			  if (refedmap.containsKey(job))
			  {  
			ArrayList<Integer> refedjobs=refedmap.get(job);
			
			Iterator it =refedjobs.iterator();
			 while (it.hasNext())
			 {
				 int job1=(Integer) it.next(); //要判断的job
				 ArrayList<Integer> refjobs=refmap.get(job1); //所依赖的job
				 int totalsize=refjobs.size();
				 int cnt=0;
				 for (int i=0;i<totalsize;i++)
				 {
					
					 int job2=refjobs.get(i);
					  if (stautsmap.containsKey(job2))
					  {
					 if (stautsmap.get(job2).intValue()==0)
						 
						 cnt++;
					 else 
						 break; //只要发现有一个失败的作业就没必要再继续检查了
					 
				 }
					  else 
						  break; //只要发现有一个作业还没写入开始,没有写入状态表就没必要再继续检查了
				 }
				 if (cnt==totalsize)//全部运行成功
					// qe.offer(job1);
				 {
					 
					 
					  ArrayList<String> jobinfolist=jobinfomap.get(job1);
	// String jobname=jobinfomap.get(job).get(0);
	 //String jobkind=jobinfomap.get(job).get(1);
	// String jobname=jobinfolist.get(0);
	 //String jobkind=jobinfolist.get(1);
	// int source_id=Integer.parseInt(jobinfolist.get(2));
	 int priority=Integer.parseInt(jobinfolist.get(3));
	 synchronized(jobqueue)
	 {
	 jobqueue.add(new JobInfo(priority,new Date().getTime(),job1));
	 resortJobqueue();
		System.out.println("jobqueue.toString(): "+jobqueue.toString());
	 jobqueue.notifyAll();
	 }
					 
				 }
					 
				 
				 
			 
			}
		}
			
		}
 }
 
 public void resortJobqueue()
 {
	 Comparator<JobInfo> comp = new Comparator<JobInfo>() {
	      public int compare(JobInfo o1, JobInfo o2) {
	        int res = o1.getPriority()-o2.getPriority();
	        if(res == 0) {
	          if(o1.getStarttime() < o2.getStarttime())
	            res = 1;
	          
	          else
	            res = (o1.getStarttime()==o2.getStarttime() ? 0 : -1);
	        }
	          
	        return -res;
	      }
	    };
	    
	    synchronized (jobqueue) {
	      Collections.sort(jobqueue, comp);
	    }
 }
}



 

上一篇:上一篇
下一篇:下一篇

 

随机推荐程序问答结果

 

 

如对文章有任何疑问请提交到问题反馈,或者您对内容不满意,请您反馈给我们DOC100.NET论坛发贴求解。
DOC100.NET资源网,机器学习分类整理更新日期::2014-01-05 02:28:42
如需转载,请注明文章出处和来源网址:http://www.doc100.net/bugs/t/10242/
本文WWW.DOC100.NET DOC100.NET版权所有。