How to update and insert 100k CSV lines into the 10M records MS SQL Server database table by a cron job in a few seconds?
Context
In my application, there is a CUSTOMER table containing more than 10 million records. And daily, another application will export CRM data to a CSV file and upload it to an S3 bucket. This CSV file usually contains about 30 thousand lines. In there, more than 1 thousand lines are new customers and the remaining are existing customers in the CUSTOMER table need to be updated their information. 1 cron job reads the CSV file and updates/inserts it to the CUSTOMER table. The issue here is if we update each of nearly 30k existing records into the CUSTOMER table with 10M records existing in there because it will take a few hours to complete. After googling for some solutions, I got a suggestion with the concept INSERT ON DUPLICATE KEY UPDATE. But this solution is usually only used in MySQL or PostgreSQL and not supported by MS SQL Server, instead of this, MS SQL Server has another solution to resolve this issue, which is the MERGE concept. Then, I have applied to my application then I take notes in a short demo here to reuse later and share someone finding solutions to resolve similar issues.
Idea
- Just insert all CSV lines to a temporary table `CUSTOMER_TEMP` by using the MyBatis batch insert.
- Call a SQL script by MyBatis ScriptRunner to merge data from the `CUSTOMER_TEMP` to the `CUSTOMER` table by the internal database engine.
In the real project, I'm using spring-boot-start-batch, but for demo purposes, I only write a short Java class to read the CSV file customers_100k_lines.csv from the current project folder instead of the S3 bucket, then batch insert to CUSTOMER_TEMP and call ScriptRunner to MERGE the data to the CUSTOMER table. The CSV file contains 50k lines are existing records in the CUSTOMER table should be updated and 500k are new ones, should be inserted. I'm also prepared 2 SQL scripts to create the CUSTOMER_TEMP and CUSTOMER table with 10M records existing. If you want to run this source code, please correct the database information in the application.yml file, then you can check the total time spent, and the time spent for batch insert and data merging in log files. Hope this is helpful for someone.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
log.info("Start to update/insert 100k CSV lines to a database table contains 10M records."); | |
CSVReader csvReader = null; | |
SqlSession batchSession = null; | |
try { | |
File csvFile = new File("customers_100k_lines.csv"); | |
InputStream csvInputStream = new FileInputStream(csvFile); | |
csvReader = new CSVReaderBuilder(new InputStreamReader(csvInputStream)).withSkipLines(1).build(); | |
batchSession = sqlSessionTemplate.getSqlSessionFactory().openSession(ExecutorType.BATCH, false); | |
CustomerMapper batchMapper = (CustomerMapper) batchSession.getMapper(CustomerMapper.class); | |
log.info(String.format("Total records in CUSTOMER table before: %d", batchMapper.count())); | |
long startTime = System.currentTimeMillis(); | |
int i = 0; | |
int skip = 0; | |
int inserted = 0; | |
String [] x; | |
// Read line by from the CSV Input stream | |
while ((x = csvReader.readNext()) != null) { | |
if (!StringUtils.isEmpty(x[0])) { | |
Customer c = null; | |
try { | |
c = transCustomer(x); | |
} catch (Exception e) { | |
skip++; | |
log.warn(String.format("Can not transfer from String: [ %s ] to a customer object."), String.join(",", x)); | |
} | |
if (null != c) { | |
try { | |
batchMapper.insertTemp(c); | |
i++; | |
} catch (Exception e) { | |
skip++; | |
log.error(String.format("Can not insert customer object: %s"), gson.toJson(c)); | |
} | |
} | |
} else { | |
skip++; | |
log.warn(String.format("Can not transfer from String: [ %s ] to a customer object."), String.join(",", x)); | |
} | |
if (i != 0 && i % BATCH_SIZE == 0) { | |
try { | |
List<BatchResult> batchResults = batchSession.flushStatements(); | |
inserted += batchResults.size(); | |
batchSession.commit(); | |
batchSession.clearCache(); | |
} catch (Exception e) { | |
log.error(e.getMessage()); | |
} | |
} | |
} | |
// Last batch | |
if (i != 0 && i % BATCH_SIZE != 0) { | |
try { | |
List<BatchResult> batchResults = batchSession.flushStatements(); | |
inserted += batchResults.size(); | |
batchSession.commit(); | |
batchSession.clearCache(); | |
} catch (Exception e) { | |
log.error(e.getMessage()); | |
} | |
} | |
long endTime = System.currentTimeMillis(); | |
log.info(String.format("Total CSV lines: %d. Total inserted to CUSTOMER_TEMP table: %d by %d batchs. Time spent: %d (ms).", (i + skip), i, inserted, (endTime - startTime))); | |
// Merge data from CUSTOMER_TEMP to CUSTOMER table. | |
Connection conn = batchSession.getConnection(); | |
ScriptRunner runner = new ScriptRunner(conn); | |
runner.setSendFullScript(true); | |
runner.setLogWriter(null); | |
Reader r = Resources.getResourceAsReader("sql-scripts/Merge_data_from_CUSTOMER_TEMP_into_CUSTOMER_table.sql"); | |
long startScript = System.currentTimeMillis(); | |
runner.runScript(r); | |
long endScript = System.currentTimeMillis(); | |
log.info(String.format("Time spent to merge data from CUSTOMER_TEMP to CUSTOMER table: %d (ms).", (endScript - startScript))); | |
log.info(String.format("Total records in CUSTOMER table after: %d", batchMapper.count())); | |
} catch (Exception e) { | |
log.error(e.getMessage()); | |
} finally { | |
if (null != batchSession) { | |
batchSession.close(); | |
} | |
if (null != csvReader) { | |
try { | |
csvReader.close(); | |
} catch (IOException e) { | |
log.error(e.getMessage()); | |
} | |
} | |
} | |
// Truncate table CUSTOMER_TEMP | |
SqlSession simpleSession = null; | |
try { | |
simpleSession = sqlSessionTemplate.getSqlSessionFactory().openSession(ExecutorType.SIMPLE, false); | |
CustomerMapper simpleMapper = (CustomerMapper) simpleSession.getMapper(CustomerMapper.class); | |
simpleMapper.truncateTemp(); | |
simpleSession.commit(); | |
} catch (Exception e) { | |
log.error(e.getMessage()); | |
} finally { | |
if (null != simpleSession) { | |
simpleSession.close(); | |
} | |
} | |
log.info("End to update/insert 100k CSV lines to a database table contains 10M records."); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
MERGE CUSTOMER AS TARGET | |
USING CUSTOMER_TEMP AS SOURCE | |
ON (TARGET.ID = SOURCE.ID) | |
WHEN NOT MATCHED BY TARGET THEN | |
INSERT (ID, FIRST_NAME, LAST_NAME, EMAIL, COUNTRY, APP_INSTALL) | |
VALUES (SOURCE.ID, SOURCE.FIRST_NAME, SOURCE.LAST_NAME, SOURCE.EMAIL, SOURCE.COUNTRY, SOURCE.APP_INSTALL) | |
WHEN MATCHED THEN UPDATE SET | |
TARGET.FIRST_NAME = SOURCE.FIRST_NAME, | |
TARGET.LAST_NAME = SOURCE.LAST_NAME, | |
TARGET.EMAIL = SOURCE.EMAIL, | |
TARGET.COUNTRY = SOURCE.COUNTRY, | |
TARGET.APP_INSTALL = SOURCE.APP_INSTALL; |
Logs:
Total records in CUSTOMER table before: 10000000
Total CSV lines: 100000. Total inserted to CUSTOMER_TEMP table: 100000 by 10 batchs. Time spent: 15589 (ms).
Time spent to merge data from CUSTOMER_TEMP to CUSTOMER table: 1325 (ms).
Total records in CUSTOMER table after: 10100000
Link to GitHub source code: https://github.com/koacervate/update-100k-csv-lines-to-10M-records-database-table
References:
https://ichihedge.wordpress.com/2020/01/12/mybatis-save-or-update/
https://www.sqlshack.com/understanding-the-sql-merge-statement/
https://docs.microsoft.com/en-us/sql/t-sql/statements/merge-transact-sql?view=sql-server-ver15