添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

JdbcPagingItemReader分页和线程安全

通过源码来分析其分页和线程安全。

http://my.oschina.net/xinxingegeya/blog/347227

http://my.oschina.net/xinxingegeya/blog/344117

多线程的Step中的每一个commit-interval都是在不同的线程中执行的。在chunk的参与者中reader和writer或许是有状态的,要保证reader和writer在多线程中的线程安全性。那么这里的JdbcPagingItemReader是如何保证线程安全性的。

如下JdbcPagingItemReader的继承关系树:

当 SimpleChunkProvider 调用 itemReader.read() 方法时,如下,

* Surrounds the read call with listener callbacks.  * @return item  * @throws Exception protected final I doRead() throws Exception { try { listener.beforeRead(); I item = itemReader.read(); if(item != null) { listener.afterRead(item); return item; catch (Exception e) { logger.debug(e.getMessage() + " : " + e.getClass().getName()); listener.onReadError(e); throw e;

也就是调用 JdbcPagingItemReader实例的 read 方法,其方法实现在其父类 AbstractItemCountingItemStreamItemReader 中,如下:

@Override
public T read() throws Exception, UnexpectedInputException, ParseException {
	if (currentItemCount >= maxItemCount) {
		return null;
	currentItemCount++;
	T item = doRead();
	if(item instanceof ItemCountAware) {
		((ItemCountAware) item).setItemCount(currentItemCount);
	return item;
 

那么doRead() 方法又是谁实现的,AbstractPagingItemReader,doRead()方法如下:

@Override
protected T doRead() throws Exception {
	synchronized (lock) {
		if (results == null || current >= pageSize) {
			if (logger.isDebugEnabled()) {
				logger.debug("Reading page " + getPage());
			doReadPage();
			page++;
			if (current >= pageSize) {
				current = 0;
		int next = current++;
		if (next < results.size()) {
			return results.get(next);
		else {
			return null;
 

那么现在就可以找到 reader 同步的关键所在了,在doRead() 方法中使用了synchronized关键字做的线程间的同步处理,也就是只有一个线程获得这个lock ,从而执行这个方法。

那么接下来,看看doReadPage(); JdbcPagingItemReader是如何实现的,

@Override
@SuppressWarnings("unchecked")
protected void doReadPage() {
	if (results == null) {
		results = new CopyOnWriteArrayList<T>();
	else {
		results.clear();
	PagingRowMapper rowCallback = new PagingRowMapper();
	List<?> query;
	if (getPage() == 0) {
		if (logger.isDebugEnabled()) {
			logger.debug("SQL used for reading first page: [" + firstPageSql + "]");
		if (parameterValues != null && parameterValues.size() > 0) {
			if (this.queryProvider.isUsingNamedParameters()) {
				query = namedParameterJdbcTemplate.query(firstPageSql,
						getParameterMap(parameterValues, null), rowCallback);
			else {
				query = getJdbcTemplate().query(firstPageSql,
						getParameterList(parameterValues, null).toArray(), rowCallback);
		else {
			query = getJdbcTemplate().query(firstPageSql, rowCallback);
	else {
		previousStartAfterValues = startAfterValues;
		if (logger.isDebugEnabled()) {
			logger.debug("SQL used for reading remaining pages: [" + remainingPagesSql + "]");
		if (this.queryProvider.isUsingNamedParameters()) {
			query = namedParameterJdbcTemplate.query(remainingPagesSql,
					getParameterMap(parameterValues, startAfterValues), rowCallback);
		else {
			query = getJdbcTemplate().query(remainingPagesSql,
					getParameterList(parameterValues, startAfterValues).toArray(), rowCallback);
	Collection<T> result = (Collection<T>) query;
	results.addAll(result);
 

以上就是分页的reader是如何实现同步的,以及整个方法调用链都完整的展现出来的,从中我们不仅可以看到同步的实现,更看到了spring 设计上的优雅,可以作为框架设计的典范。

接下来就是其如何实现分页的。

在JdbcPagingItemReader 对象中有两个属性变量如下:

remainingPagesSql "SELECT person_id, first_name, last_name FROM people WHERE ( first_name like :first_name or last_name like :last_name ) AND ((person_id > :_person_id)) ORDER BY person_id ASC LIMIT 2" (id=219)

‍‍firstPageSql‍‍ "SELECT person_id, first_name, last_name FROM people WHERE ( first_name like :first_name or last_name like :last_name ) ORDER BY person_id ASC LIMIT 2" (id=203)

通过 remainingPagesSql 这个变量你可以看到 在where 中 有这样一个限制条件 ((person_id > :_person_id)),那么这个 _person_id 参数表示什么意义,就是说上次最后读取的一行的 row id,其就是通过这个机制来保证不重复读的,LIMIT 2 就是实现分页的,这里分页的大小是 2;

还有一个事情没有弄明白,那JdbcPagingItemReader 是如何保存最后读取的 row id 呢,如下是其属性变量:

startAfterValues LinkedHashMap<K,V>  (id=126)

看一下是如何操作这个变量的,JdbcPagingItemReader的一个内部类:

private class PagingRowMapper implements RowMapper<T> {
	@Override
	public T mapRow(ResultSet rs, int rowNum) throws SQLException {
		startAfterValues = new LinkedHashMap<String, Object>();
		for (Map.Entry<String, Order> sortKey : queryProvider.getSortKeys().entrySet()) {
			startAfterValues.put(sortKey.getKey(), rs.getObject(sortKey.getKey()));
		return rowMapper.mapRow(rs, rowNum);
 

这是在JdbcPagingItemReader  类内部实现的 RowMapper 接口。

queryProvider.getSortKeys().entrySet(),这个就是通过配置文件中配置的 sortKey 来确定通过哪个列指定分页开始的位置。如下:

<property name="queryProvider">
		class="org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean">
		<property name="dataSource" ref="dataSource" />
		<property name="selectClause" value="select person_id, first_name, last_name" />
		<property name="fromClause" value="from people" />
		<property name="whereClause"
			value="where ( first_name like :first_name or last_name like :last_name ) " />
		<property name="sortKey" value="person_id" />
	</bean>
</property>

以上就其分页实现的过程。还是挺优雅的。这样分页的一个好处就是当job失败,需要重启,不需要担心中间数据增加或删除的情况。

===============END===============

2019独角兽企业重金招聘Python工程师标准&gt;&gt;&gt; ...
spring batch简介 spring batch是spring提供的一个数据处理框架。企业域中的许多应用程序需要批量处理才能在关键任务环境中执行业务操作。 这些业务运营包括: 无需用户交互即可最有效地处理大量信息的自动化,复杂处理。 这些操作通常包括基于时间的事件(例如月末计算,通知或通信)。 在非常大的数据集中重复处理复杂业务规则的定期应用(例如,保险利益确定或费率调整)。 集成从内部和外部系统接收的信息,这些信息通常需要以事务方式格式化,验证和处理到记录系统中。 批处理用于每天为企业处理数十亿的
本篇博客是记录使用spring batch做数据迁移时时遇到的一个关键问题:数据迁移量大时如何保证内存。当我们在使用spring batch时,我们必须配置三个东西: reader,processor,和writer。其中,reader用于从数据库中读数据,当数据量较小时,reader的逻辑不会对内存带来太多压力,但是当我们要去读的数据量非常大的时候,我们就不得不考虑内存等方面的问题,因为若数据量非常大,内存,执行时间等等都会受到影响。 问题是什么 在上面的内容当中我们已经提到了,我们面临的问题是数据
spring batch ItemReader详解ItemReaderItemReaderItemStream系统读组件读数据库JdbcCursirItemReaderJdbcPagingItemReaderJpaPagingItemReaderJpaCursorItemReaderMyBatisCursorItemReaderMyBatisPagingItemReaderItemReader类图服务复用自定义ItemReader不可重启ItemReader可重启ItemReader拦截器接口异常执行顺序A
@Results(id = "userResultMap", value = { @Result(property = "id", column = "id", id = true), @Result(property = "name", column = "name"), package com.springbatch._09item_reader_from_db; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.boot.SpringApplication; import org.springframework.boo
只能靠写博客来鞭策自己学习了读取数据读取数据库的数据读取文件的数据读取多个文件的数据写入数据写到数据库写到文件写到多个文件 系列文章第四篇,学习一下 spring batch 的两个重要的功能,读数据(Reader)和写数据(Writer) 。 第一篇文章的传送门:Spring batch系列文章(一)—— 介绍和入门 第二篇的文章传送门:Spring batch系列文章(二)—— 核心api 第三篇的文章传送门:Spring batch系列文章(三)—— 决策器和监听器 读取数据库的数据 select * from paroduct limit 0,5; #第一页,每页显示5条 select * from paroduct limit 5,5; #第二页,每页显示5条 se...
分页可以在数据库中就分好,也可以先把所有结果取到内存再分页。 如果是数据量非常大,应该是最好在数据库中分好,显示多少取多少,如果一次性全部取出太多数据库,服务器压力大。 一、scroll是利用JDBC2.0的功能做分页的,那么就完全取决于特定数据库JDBC Driver的实现了。事实上大部分JDBC Driver都是把所有的结果集都一次取到内存,然后再分页的。如果这个结果集非常大,例如几万条
int pageSize = 10; int startRow = (currPage-1)*pageSize; StringBuffer countSql = new StringBuffer("select count(*) "); StringBuffer resSql = new StringBuffer("sele 这里对基础概念就不做过多的说明了,直接进入具体实现环节: 2.1 pom 文件 在 projecet 的 pom 文件中引入 spring-boot-starter-batch.jar,如下: <dependencies> <dependency> <groupId&g
Vue.js 是一个渐进式的 JavaScript 框架,而 Element UI 是一个基于 Vue.js 的组件库,其中有一个叫做 el-table 的组件可以用来实现分页表格。 通过在 el-table 上使用 "pagination" 属性来实现分页功能,并通过 "total" 属性来设置总条数,"page-size" 属性来设置每页显示的条数,"current-page" 属性来设置当前页码。 <el-table :data="tableData" pagination :total="total" :page-size="pageSize" :current-page="currentPage" @current-change="handleCurrentChange" <el-table-column prop="date" label="日期" ></el-table-column> <el-table-column prop="name" label="姓名" ></el-table-column> <el-table-column prop="address" label="地址" ></el-table-column> </el-table> 其中 "handleCurrentChange" 是在页码变化时触发的事件方法,在这个方法中你可以获取当前页码并请求对应数据。