PDF
MySQl1ETLContents ....................................................................................... 1MySQl ......................................................................................... 1西亿1.bakMySQl使线线 MySQl2ETL Enginepublic class Engine { private static final Logger logger = LoggerFactory.getLogger(Engine.class); private final int threads; private final int totalCount; private final int batchSize; private final HikariDataSource source; private final HikariDataSource destination; private final AtomicLong rangeSelector = new AtomicLong(0); private final List<Thread> tasks = new ArrayList<>(); public Engine(int threads, int totalCount, int batchSize) { this.threads = threads; this.totalCount = totalCount; this.batchSize = batchSize; this.source = new HikariDataSource(new HikariConfig("src/main/resources/source.properties")); this.destination = new HikariDataSource(new HikariConfig("src/main/resources/destination.properties")); }线线public void run() throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(threads); for (int i = 0; i < threads; i++) { Thread thread = new Thread(new Task(source, destination, totalCount, rangeSelector, batchSize, countDownLatch)); tasks.add(thread); } tasks.forEach(Thread::start); countDownLatch.countDown(); countDownLatch.await();} MySQl3线线Task线线public void run() { logger.info("ETL task {} running...", Thread.currentThread().getId()); try (Connection sourceConn = source.getConnection(); Connection destinationConn = destination.getConnection()) { while (true) { final long startId = rangeSelector.getAndAdd(batchSize); final long endId = startId + batchSize; if (startId > totalCount) { logger.info("Reached end of records:{} , task finished", startId); break; } job.doTransfer(sourceConn, destinationConn, startId, endId); } } catch (SQLException ex) { logger.error("Failed to get data source, ex"); } finally { logger.info("ETL task {} finished.", Thread.currentThread().getId()); countDownLatch.countDown(); }}线Jobpublic class SeTransferJob implements TransferJob { private static final Logger logger = LoggerFactory.getLogger(SeTransferJob.class); private static final String query = "select * from sgk where id >? and id <=?"; private static final String insertSqlTemplate = "insert into se_record_%d(id, user_name, email, password, salt, source, remark) values (?,?,?,?,?,?,?);"; @Override public void doTransfer(Connection source, Connection target, long startId, long endId) { logger.info("Transferring records [{}, {}]", startId, endId); MySQl4 try { target.setAutoCommit(false); } catch (SQLException throwables) { throw new RuntimeException("Cannot open transaction"); } long tableIndex = (startId / 10000000) + 1; String insertSql = String.format(insertSqlTemplate, tableIndex); try (PreparedStatement queryStatement = source.prepareStatement(query); PreparedStatement saveStatement = target.prepareStatement(insertSql)) { queryStatement.setLong(1, startId); queryStatement.setLong(2, endId); try (ResultSet result = queryStatement.executeQuery()) { while (result.next()) { Record record = Record.from(result); if (!record.isValid()) { logger.warn("Found invaid record:{}", record); } else { record.attach(saveStatement); saveStatement.addBatch(); } } } saveStatement.executeBatch(); target.commit(); logger.info("Records [{}, {}] -> se_record_{} commited", startId, endId, tableIndex); } catch (SQLException ex) { logger.error("Unexpected sql error:", ex); throw new RuntimeException(ex); } }}

HTML view coming soon.

Download PDF for the full formatted version.