Blog:一个简单的ETL程序

来自WHY42

这两天闲着没事准备玩一下社工库,网上有很多以前的社工库(这些都可以下载到,但是实际上已经没有什么太大的价值了,因为暴露时间太久,以及相关的网站都已经做了处理,所以别指望能够找到什么有价值的东西),通过社工库可以了解到的一个实际的数据就是,用户的设置的密码大都是什么样子的。我准备看一下搜云社工库,这个库大概4亿多条数据,主要目的是实践一下大量数据的处理。


还原原始数据库

这个库(下载地址请自行搜索)下载完成之后是一个`1.bak`(SQL SERVER的备份文件)文件,接近30G,网上有详细的教程如何导入,总结起来大致如下:

  • 安装SQL SERVER 2008 R2(标准版或者更高)
  • 将1.bak复制到SQL SERVER的实例备份目录,否则会无法导入
  • 因为还原后的数据库占用空间在130G,所以要保证数据库的存储磁盘空间足够

还原完成之后即可得到一个sgk的数据表,里面有423771078条数据。

导入到MySQl

因为最终希望能够在MYSQL中使用数据,所以还得将数据迁移到MySQL中。最初的想法是直接将数据导出到csv,然后再通过MySQL导入csv。但是这个方法尝试了几次之后发现一些问题:

  • 数据中可能存在一些非可见字符,导致导出后的csv格式混乱
  • 莫名其妙的问题无法解析

索性就直接放弃了这种方式,而是通过手写ETL来完成迁移。于是设计了一个简陋的ETL程序,还挺有意思的。

整体架构

ETL的主要目的是把数据从一个地方转移迁移到另一个地方,这其中可能进行一些其他的操作比如清洗或者格式转换。整体的思路是这样的:

  • 有一个source(源数据)和destination(目标数据源)
  • ETL执行的时候从源数据查询数据,然后保存到目标数据库中,为了提高效率,每次会查询一批数据,保存的时候也会一批一批保存
  • 为进一步提高性能,ETL可以设定多个线程同时处理,为了避免冲突,每个线程不会处理同一条数据,是互斥的

ETL Engine

public class Engine {
    private static final Logger logger = LoggerFactory.getLogger(Engine.class);

    private final int threads;
    private final int totalCount;
    private final int batchSize;

    private final HikariDataSource source;
    private final HikariDataSource destination;

    private final AtomicLong rangeSelector = new AtomicLong(0);

    private final List<Thread> tasks = new ArrayList<>();

    public Engine(int threads, int totalCount, int batchSize) {
        this.threads = threads;
        this.totalCount = totalCount;
        this.batchSize = batchSize;
        this.source = new HikariDataSource(new HikariConfig("src/main/resources/source.properties"));
        this.destination = new HikariDataSource(new HikariConfig("src/main/resources/destination.properties"));
    }

这个ETL引擎担负管理任务的角色,有这些参数:

  • threads: 可以设定多少个线程同时运行;
  • totalCount: 设定一个totalCount作为任务结束的条件(当然也可以在查询不到数据的时候结束)
  • batchSize: 每一批的大小
  • rangeSelector: 用来控制每个线程处理的数据,这里将直接用id来区分

同时直接初始化了两个数据库的连接池,这里选用的是Hikari连接池。

public void run() throws InterruptedException {
    final CountDownLatch countDownLatch = new CountDownLatch(threads);
    for (int i = 0; i < threads; i++) {
        Thread thread = new Thread(new Task(source,
                destination,
                totalCount,
                rangeSelector,
                batchSize,
                countDownLatch));
        tasks.add(thread);
    }
    tasks.forEach(Thread::start);
    countDownLatch.countDown();
    countDownLatch.await();
}

核心逻辑就是,启动N个线程,然后一直等待到每个线程都结束,即完成了ETL任务。

Task

Task对应到每个线程,每个线程处理的任务是一样的,只是处理的数据记录不同。

public void run() {
    logger.info("ETL task {} running...", Thread.currentThread().getId());
    try (Connection sourceConn = source.getConnection();
         Connection destinationConn = destination.getConnection()) {
        while (true) {
            final long startId = rangeSelector.getAndAdd(batchSize);
            final long endId = startId + batchSize;
            if (startId > totalCount) {
                logger.info("Reached end of records:{} , task finished", startId);
                break;
            }
            job.doTransfer(sourceConn, destinationConn, startId, endId);
        }
    } catch (SQLException ex) {
        logger.error("Failed to get data source, ex");
    } finally {
        logger.info("ETL task {} finished.", Thread.currentThread().getId());
        countDownLatch.countDown();
    }

}

处理过程也很简单,拿到一个connection之后,根据rangeSelector得到要获取的id范围,然后委派给具体的Job去处理。当超出最大范围的时候,停止该线程。

Job

Job对应到具体每条数据该如何传输,会稍微麻烦一点。但本质还是select然后insert,没什么技术含量。

public class SeTransferJob implements TransferJob {
    private static final Logger logger = LoggerFactory.getLogger(SeTransferJob.class);
    private static final String query = "select * from sgk where id >? and id <=?";
    private static final String insertSqlTemplate = "insert into se_record_%d(id, user_name, email, password, salt, source, remark) values (?,?,?,?,?,?,?);";

    @Override
    public void doTransfer(Connection source, Connection target, long startId, long endId) {
        logger.info("Transferring records [{}, {}]", startId, endId);

        try {
            target.setAutoCommit(false);
        } catch (SQLException throwables) {
            throw new RuntimeException("Cannot open transaction");
        }
        long tableIndex = (startId / 10000000) + 1;
        String insertSql = String.format(insertSqlTemplate, tableIndex);
        try (PreparedStatement queryStatement = source.prepareStatement(query);
             PreparedStatement saveStatement = target.prepareStatement(insertSql)) {
            queryStatement.setLong(1, startId);
            queryStatement.setLong(2, endId);
            try (ResultSet result = queryStatement.executeQuery()) {
                while (result.next()) {
                    Record record = Record.from(result);
                    if (!record.isValid()) {
                        logger.warn("Found invaid record:{}", record);
                    } else {
                        record.attach(saveStatement);
                        saveStatement.addBatch();
                    }
                }
            }
            saveStatement.executeBatch();
            target.commit();
            logger.info("Records [{}, {}] -> se_record_{} commited", startId, endId, tableIndex);
        } catch (SQLException ex) {
            logger.error("Unexpected sql error:", ex);
            throw new RuntimeException(ex);
        }
    }
}

这里因为数据量太大,做了一个分区的处理,直接按照id来进行分区,每个表控制在千万以下。因此导入完成后,最终会有40多个表,每个表有接近1千万的数据。否则单表过大之后的插入性能会变得很低。

以上就是整个ETL的核心思想,还是有一定的扩展性的,哈哈。