首页> 中国专利> 数据集成系统和数据集成方法

数据集成系统和数据集成方法

摘要

本发明提供了一种数据集成系统和数据集成方法,其中,该数据集成系统包括:日志挖掘单元,用于通过第一线程对源数据库进行日志挖掘,将挖掘出的数据中符合第一预设条件的数据实例成对象,把对象存储至缓存;处理单元,用于通过第二线程轮询缓存,以从缓存中获取符合第二预设条件的对象进行处理得到处理数据,将处理数据发送至消息队列;写入单元,用于通过第三线程轮询消息队列,并在消息队列中存在消息时获取消息,对消息进行处理得到目标语句,将语句写入目标数据库。通过本申请的技术方案,可以有效地提高数据集成的稳定性和数据集成的效率。

著录项

  • 公开/公告号CN103488690A

    专利类型发明专利

  • 公开/公告日2014-01-01

    原文格式PDF

  • 申请/专利权人 用友软件股份有限公司;

    申请/专利号CN201310392454.3

  • 发明设计人 衡骏;

    申请日2013-09-02

  • 分类号G06F17/30(20060101);

  • 代理机构北京友联知识产权代理事务所(普通合伙);

  • 代理人尚志峰;汪海屏

  • 地址 100094 北京市海淀区北清路68号用友软件园

  • 入库时间 2024-02-19 21:48:50

法律信息

  • 法律状态公告日

    法律状态信息

    法律状态

  • 2017-06-30

    授权

    授权

  • 2015-12-02

    著录事项变更 IPC(主分类):G06F17/30 变更前: 变更后: 申请日:20130902

    著录事项变更

  • 2014-02-05

    实质审查的生效 IPC(主分类):G06F17/30 申请日:20130902

    实质审查的生效

  • 2014-01-01

    公开

    公开

说明书

技术领域

本发明涉及数据处理技术领域,具体而言,涉及一种数据集成系统和 一种数据集成方法。

背景技术

目前在云计算环境下BI(商业智能)战略变得越来越重要,而在商 业智能中实时的数据集成在其中占有很重要的部分,需要一种完成数据抽 取,转换,加载,分析等一整套数据处理流程的方法。

目前现有的实时数据集成的方法稳定性能比较差,对于列存储的数据 仓库如gbase也没有很好的支持,并且效率也不是很高,因此有必要对现 有的增量实时数据集成进行优化。

发明内容

本发明正是基于上述问题,提出了一种数据集成技术,能够有效地提 高数据集成的稳定性和数据集成的效率。

有鉴于此,本发明提出了一种数据集成系统,包括:日志挖掘单元, 用于通过第一线程对源数据库进行日志挖掘,将挖掘出的数据中符合第一 预设条件的数据实例成对象,把所述对象存储至缓存;处理单元,用于通 过第二线程轮询所述缓存,以从所述缓存中获取符合第二预设条件的对象 进行处理得到处理数据,将所述处理数据发送至消息队列;写入单元,用 于通过第三线程轮询所述消息队列,并在所述消息队列中存在消息时获取 所述消息,对所述消息进行处理得到目标语句,将所述语句写入目标数据 库。

在上述技术方案中,优选地,还包括:持久化单元,用于对所述挖掘 出的数据进行持久化。

在上述技术方案中,优选地,还包括:记录单元,用于通过所述第二 线程记录所述处理数据的标识。

在上述技术方案中,优选地,所述写入单元还用于通过所述第三线程 将所述目标语句批量写入所述目标数据库。

在上述技术方案中,优选地,所述第一线程、所述第二线程和所述第 三线程是异步执行的。

本申请还提出了一种数据集成方法,包括:步骤202,通过第一线程 对源数据库进行日志挖掘,将挖掘出的数据中符合第一预设条件的数据实 例成对象,把所述对象存储至缓存;步骤204,通过第二线程轮询所述缓 存,以从所述缓存中获取符合第二预设条件的对象进行处理得到处理数 据,将所述处理数据发送至消息队列;步骤206,通过第三线程轮询所述 消息队列,并在所述消息队列中存在消息时获取所述消息,对所述消息进 行处理得到目标语句,将所述语句写入目标数据库。

在上述技术方案中,优选地,所述步骤202还包括:对所述挖掘出的 数据进行持久化。

在上述技术方案中,优选地,所述步骤204还包括:通过所述第二线 程记录所述处理数据的标识。

在上述技术方案中,优选地,所述步骤206还包括:通过所述第三线 程将所述目标语句批量写入所述目标数据库。

在上述技术方案中,优选地,所述第一线程、所述第二线程和所述第 三线程是异步执行的。

通过以上技术方案,可以有效地提高数据集成的稳定性和数据集成的 效率。

附图说明

图1示出了根据本发明的实施例的数据集成系统的示意框图;

图2示出了根据本发明的实施例的数据集成方法的示意流程图;

图3示出了根据本发明的实施例的数据集成系统的结构示意图;

图4示出了根据本发明的实施例的数据流向示意图;

图5示出了根据本发明的实施例的数据挖掘和发送至消息队列的示意 流程;

图6示出了根据本发明的实施例的数据写入目标数据库的示意流程 图。

具体实施方式

为了能够更清楚地理解本发明的上述目的、特征和优点,下面结合附 图和具体实施方式对本发明进行进一步的详细描述。需要说明的是,在不 冲突的情况下,本申请的实施例及实施例中的特征可以相互组合。

在下面的描述中阐述了很多具体细节以便于充分理解本发明,但是, 本发明还可以采用其他不同于在此描述的其他方式来实施,因此,本发明 的保护范围并不受下面公开的具体实施例的限制。

图1示出了根据本发明的实施例的数据集成系统的示意框图。

如图1所示,根据本发明的实施例的数据集成系统100包括:日志挖 掘单元102,用于通过第一线程对源数据库进行日志挖掘,将挖掘出的数 据中符合第一预设条件的数据实例成对象,把对象存储至缓存;处理单元 104,用于通过第二线程轮询缓存,以从缓存中获取符合第二预设条件的 对象进行处理得到处理数据,将处理数据发送至消息队列;写入单元 106,用于通过第三线程轮询消息队列,并在消息队列中存在消息时获取 消息,对消息进行处理得到目标语句,将语句写入目标数据库。

从源端数据库中挖掘出数据,并将数据写入目标数据库,需要先对源 端数据库的进行日志挖掘,然后将挖掘出的数据放入缓存队列,再将数据 发送至消息队列,最后写入到目标数据库,通过为日志挖掘操作、将数据 发送至消息队列操作和写入目标数据库三步操作分别设置线程,可以使得 上述三个操作同时进行,比如通过对源端数据库进行日志挖掘操作得到数 据A,然后将数据A放入缓存队列,在将缓存中的数据A发送至消息的 队列时,进行日志挖掘操作的线程(即第一线程)可以继续对源端数据进 行日志挖掘,并且将数据写入目标数据库的线程(即第三线程)也可以在 此时将在数据A之前已经存在于消息队列中的其他数据,继续写入目标数 据库,而无需等待数据A写入目标数据库之后才能再次对远端数据库进行 日志挖掘,也无需等待数据A写入目标数据库之后才将消息队列中的其他 数据写入目标数据库,从整体上提高了数据获取、处理和写入的效率,提 高了实时数据集成的效率。

其中的第一预设条件和第二预设条件,可以预先设置一张数据表来规 定所需的业务数据,对于即将实例化的数据和即将从缓存中发送至消息队 列的数据,可以判断其是否输入预先设置的表格中所规定的内容,若不属 于,说明该数据并不是用户所需,则可以直接删除,从而减少对无用数据 的处理,提高了实时数据集成的效率。

优选地,还包括:持久化单元108,用于对挖掘出的数据进行持久 化。

由于进行日志挖掘得到的内容都存在于缓存中,如果进行日志挖掘的 线程(即第一线程)中断或停止,那么存在于缓存中的内容将全部丢失, 进而导致进行需要同步的操作数据丢失,通过将挖掘出的数据进行持久化 处理,比如持久化到本地日志中,从而可以在进行日志挖掘的线程中断或 停止的情况下,依然能够通过查询本地日志来调用挖掘出的数据,提高了 实时数据集成的稳定性。

优选地,还包括:记录单元110,用于通过第二线程记录处理数据的 标识。

若第二线程发生中断或停止,那么挖掘出的数据会由于没有发送到消 息队列中而丢失,通过记录处理数据设置标识,具体可以是在每一个处理 数据发送到消息队列之前,都记录下该数据当前的SCN号,从而避免第 二线程崩溃而导致处理数据丢失,提高了实时数据集成的稳定性。

优选地,写入单元106还用于通过第三线程将目标语句批量写入目标 数据库。

可以在目标语句积累到一定数量后,通过批量导入的方式,将目标语 句写入目标数据库,从而避免逐条写入语句而造成的频繁交互,提高了实 时数据集成的效率。

优选地,还包括:所述第一线程、所述第二线程和所述第三线程是异 步执行的。从而提高每步操作的同步率,进而提高实时数据集成的效率。

图2示出了根据本发明的实施例的数据集成方法的示意流程图。

如图2所示,根据本发明的实施例的数据集成方法包括:步骤202, 通过第一线程对源数据库进行日志挖掘,将挖掘出的数据中符合第一预设 条件的数据实例成对象,把对象存储至缓存;步骤204,通过第二线程轮 询缓存,以从缓存中获取符合第二预设条件的对象进行处理得到处理数 据,将处理数据发送至消息队列;步骤206,通过第三线程轮询消息队 列,并在消息队列中存在消息时获取消息,对消息进行处理得到目标语 句,将语句写入目标数据库。

从源端数据库中挖掘出数据,并将数据写入目标数据库,需要先对源 端数据库的进行日志挖掘,然后将挖掘出的数据放入缓存队列,再将数据 发送至消息队列,最后写入到目标数据库,通过为日志挖掘操作、将数据 发送至消息队列操作和写入目标数据库三步操作分别设置线程,可以使得 上述三个操作同时进行,比如通过对源端数据库进行日志挖掘操作得到数 据A,然后将数据A放入缓存队列,在将缓存中的数据A发送至消息的 队列时,进行日志挖掘操作的线程(即第一线程)可以继续对源端数据进 行日志挖掘,并且将数据写入目标数据库的线程(即第三线程)也可以在 此时将在数据A之前已经存在于消息队列中的其他数据,继续写入目标数 据库,而无需等待数据A写入目标数据库之后才能再次对远端数据库进行 日志挖掘,也无需等待数据A写入目标数据库之后才将消息队列中的其他 数据写入目标数据库,从整体上提高了数据获取、处理和写入的效率,提 高了实时数据集成的效率。

在上述技术方案中,优选地,步骤202还包括:对挖掘出的数据进行 持久化。

由于进行日志挖掘得到的内容都存在于缓存中,如果进行日志挖掘的 线程(即第一线程)中断或停止,那么存在于缓存中的内容将全部丢失, 进而导致进行需要同步的操作数据丢失,通过将挖掘出的数据进行持久化 处理,比如持久化到本地日志中,从而可以在进行日志挖掘的线程中断或 停止的情况下,依然能够通过查询本地日志来调用挖掘出的数据,提高了 实时数据集成的稳定性。

在上述技术方案中,优选地,步骤204还包括:通过第二线程记录处 理数据的标识。

若第二线程发生中断或停止,那么挖掘出的数据会由于没有发送到消 息队列中而丢失,通过记录处理数据设置标识,具体可以是在每一个处理 数据发送到消息队列之前,都记录下该数据当前的SCN号,从而避免第 二线程崩溃而导致处理数据丢失,提高了实时数据集成的稳定性。

在上述技术方案中,优选地,步骤206还包括:通过第三线程将目标 语句批量写入目标数据库。

可以在目标语句积累到一定数量后,通过批量导入的方式,将目标语 句写入目标数据库,从而避免逐条写入语句而造成的频繁交互,提高了实 时数据集成的效率。

在上述技术方案中,优选地,所述第一线程、所述第二线程和所述第 三线程是异步执行的。从而提高每步操作的同步率,进而提高实时数据集 成的效率。

其中的第一预设条件和第二预设条件,可以预先设置一张数据表来规 定所需的业务数据,对于即将实例化的数据和即将从缓存中发送至消息队 列的数据,可以判断其是否输入预先设置的表格中所规定的内容,若不属 于,说明该数据并不是用户所需,则可以直接删除,从而减少对无用数据 的处理,提高了实时数据集成的效率。

图3示出了根据本发明的实施例的数据集成系统的结构示意图。

如图3所示,根据本申请的实施例的数据集成系统是基于日志分析和 消息中间件技术(数据集成,简称RDI),内部具有高缓存,高并发的架 构。可以通过具体有以下功能的软件和/或硬件来实现:

Source DB:用于进行源端的日志挖掘,相当于日志挖掘单元;

RDI Source Agent:从源端数据库分析日志得到事务并发送至消息中 间件,相当于处理单元;

RDI Admin Console:用来管理数据集成的实例中的任务和资源;

Message Queue(简称MQ):消息队列中间件用于传递消息;

RDI Target Agent:从消息中间件取得事务并应用到目的端数据库;

Target DB:用于写入的目标数据库,RDI Target Agent和Target DB 相当于写入单元。

其相应的数据流向如图4所示,对源端数据库402进行日志挖掘得到 的数据放入缓存队列404,然后将缓存队列404中的数据发送至消息队列 406,最后将消息队列406中的数据写入目标数据库408。

图5示出了根据本发明的实施例的数据挖掘和发送至消息队列的示意 流程。

如图5所示,数据集成引擎启动时会开启一个线程(相当于第一线 程)专门用于进行日志挖掘,挖掘的对象是数据库的归档日志,挖掘的工 具使用的是oracle自带的logminer,主要是根据数据库的系统变更号进行 挖掘。线程启动后首先会向logminer里传递一个开始系统变更号和一个结 束系统变更号,logminer会根据挖掘的范围对归档日志进行挖掘(当然, 如果该挖掘操作发生在系统崩溃后,则可以根据记录下的SCN号进行日 志挖掘),挖掘出记录后会选择有效的字段内容并实例成对象,最后把实 例好的对象存入系统缓存中等待下一步处理。在这里所有发掘出来的内容 都存在于系统的缓存中因此如果发掘线程突然中断或者停止,则存在与缓 存中的内容都会丢失,带来的问题就是进行需要同步的操作数据丢失,因 此提出的改进方案就是把挖掘出来的数据持久化到本地的日志中,这样可 以方便在出错的时候对发掘的数据进行查询。

数据集成引擎在启动后会开启另一个线程(相当于第二线程)专门用 于把存在于系统缓存中的数据发送到消息队列中。它与日志挖掘的线程是 异步执行的,这样可以增加同步的效率。线程启动后会轮询系统的缓存队 列,如果发现有有效的实例化对象,会把该对象放入一个事务处理的对象 中,直到一个事务提交完毕后,线程会把该事务对象送入配置好的过滤器 进行过滤操作,没有被过滤掉的数据会通过发送通道发送到MQ(即 Message Queue)消息队列中,等待下一步处理。在这里提出的改进方案 是在每一个消息发送到队列时都记录下当前的SCN号以防止在系统崩溃 时已发掘出来的但还没有发送到消息队列的数据丢失。

图6示出了根据本发明的实施例的数据写入目标数据库的示意流程 图。

数据集成引擎在启动后会开启一个线程(相当于第三线程)用于获取 MQ消息队列中的数据并把其写入数据库。线程启动后会轮询MQ消息队 列,如果发现有消息,会把消息中的事务对象提取出来,然后传入事务拦 截器进行处理,最后提取出处理好的事务对象中的SQL语句并写入目标 数据库完成数据的同步过程。但目前数据导入的效率比较低,是因为数据 采用的是逐行导入的方式,改进方案是提供一个批量导入的方法,这样整 体同步的效率会增加,写入效率也会提升。

以上结合附图详细说明了本发明的技术方案,考虑到相关技术中,数 据集成的方法稳定性能比较差,并且效率也不是很高。通过本申请的技术 方案,能够有效地提高数据集成的稳定性和数据集成的效率。

在本发明中,术语“第一”、“第二”仅用于描述目的,而不能理解为指 示或暗示相对重要性。术语“多个”指两个或两个以上,除非另有明确的限 定。

以上所述仅为本发明的优选实施例而已,并不用于限制本发明,对于 本领域的技术人员来说,本发明可以有各种更改和变化。凡在本发明的精 神和原则之内,所作的任何修改、等同替换、改进等,均应包含在本发明 的保护范围之内。

去获取专利,查看全文>

相似文献

  • 专利
  • 中文文献
  • 外文文献
获取专利

客服邮箱:kefu@zhangqiaokeyan.com

京公网安备:11010802029741号 ICP备案号:京ICP备15016152号-6 六维联合信息科技 (北京) 有限公司©版权所有
  • 客服微信

  • 服务号