참고 : https://jojoldu.tistory.com/473 (현 인프런 CTO 님, 이동욱 개발자 블로그)
개인적으로, SpringBatch에서 제공해주는 ItemReader의 경우 모두 텍스트 기반으로 하드 코딩 해야한다는 점이 큰 불만이었다.
현재는 JdbcItemReader를 사용하고 있는데, 많은 회사에서 QuerydslItemReader를 직접 구현하여 사용하고 있는 것을 확인하고 라이브러리가 존재하면 가져다가 사용할까? 싶기도 했지만 대부분이 SpringBatch 4 를 기반으로 작성되어 있고 SpringBoot2.xx 기반이라 오류가 발생하기에, 공부도 할겸, SpringBatch 5와 SpringBoot3.xx에 맞게 직접 만들어서 라이브러리로 만들어볼까 한다.
우선, 기본적으로 Chunk 지향 구조는 위와 같이 동작한다.
doReadPage()
page(offset)
와pageSize(limit)
을 통해서 데이터를 가지고 온다.read()
doReadPage()
로 가져온 데이터들을 하나씩 processor로 전달한다.doReadPage()
로 가져온 데이터를 모두 processor로 전달하면, 다음 페이지를 가져오도록 다시 doReadPage()
를 호출한다.위의 내용을 알아둔 채 진행하도록 하자.
SpringBatch의 AbstractPagingItemReader를 구현하는 QueryDslItemReader를 만들어 사용할 것인데,
이미 AbstractPagingItemReader를 구현하는 JpaPagingItemReader가 있으니 이를 수정하여 만들 것이다.
JpaPagingItemReader 에서 JPQL이 수행되는 부분이 doReadPage()
인데, 이 부분을 Querydsl의 쿼리를 수행하도록 변경하면 된다.
...
@Override
@SuppressWarnings("unchecked")
protected void doReadPage() {
EntityTransaction tx = null;
if (transacted) {
tx = entityManager.getTransaction();
tx.begin();
entityManager.flush();
entityManager.clear();
} // end if
Query query = createQuery().setFirstResult(getPage() * getPageSize()).setMaxResults(getPageSize());
if (parameterValues != null) {
for (Map.Entry<String, Object> me : parameterValues.entrySet()) {
query.setParameter(me.getKey(), me.getValue());
}
}
if (results == null) {
results = new CopyOnWriteArrayList<>();
}
else {
results.clear();
}
if (!transacted) {
List<T> queryResult = query.getResultList();
for (T entity : queryResult) {
entityManager.detach(entity);
results.add(entity);
} // end if
}
else {
results.addAll(query.getResultList());
tx.commit();
} // end if
}
...
이제, 이 JpaPagingItemReader를 수정하여 QueryDslPagingItemReader를 만들어 보자.
public class QueryDslPagingItemReader<T> extends AbstractPagingItemReader<T> {
protected final Map<String, Object> jpaPropertyMap = new HashMap<>();
protected EntityManagerFactory entityManagerFactory;
protected EntityManager entityManager;
protected Function<JPAQueryFactory, JPAQuery<T>> queryFunction;
private boolean transacted = true;// default value
public QueryDslPagingItemReader() {
setName(ClassUtils.getShortName(QueryDslPagingItemReader.class));
}
public QueryDslPagingItemReader(EntityManagerFactory entityManagerFactory, int pageSize,
Function<JPAQueryFactory, JPAQuery<T>> queryFunction) {
this();
this.entityManagerFactory = entityManagerFactory;
this.queryFunction = queryFunction;
setPageSize(pageSize);
}
/**
* Create a query using an appropriate query provider (entityManager OR
* queryProvider).
*/
/**
* 이 부분이 private이기 때문에 새롭게 작성하게 된 것! override할 수 없어서
*/
// private Query createQuery() {
// if (queryProvider == null) {
// return entityManager.createQuery(queryString);
// }
// else {
// return queryProvider.createQuery();
// }
// }
/**
* By default (true) the EntityTransaction will be started and committed around the
* read. Can be overridden (false) in cases where the JPA implementation doesn't
* support a particular transaction. (e.g. Hibernate with a JTA transaction). NOTE:
* may cause problems in guaranteeing the object consistency in the
* EntityManagerFactory.
* @param transacted indicator
*/
public void setTransacted(boolean transacted) {
this.transacted = transacted;
}
@Override
protected void doOpen() throws Exception {
super.doOpen();
entityManager = entityManagerFactory.createEntityManager(jpaPropertyMap);
if (entityManager == null) {
throw new DataAccessResourceFailureException("Unable to obtain an EntityManager");
}
}
@Override
@SuppressWarnings("unchecked")
protected void doReadPage() {
clearIfTransacted();
JPQLQuery<T> query = createQuery()
.offset(getPage() * getPageSize())
.limit(getPageSize());
initResults();
fetchQuery(query);
}
protected void clearIfTransacted() {
if (transacted) {
entityManager.clear();
}
}
protected JPAQuery<T> createQuery() {
JPAQueryFactory queryFactory = new JPAQueryFactory(entityManager);
return queryFunction.apply(queryFactory);
}
protected void initResults() {
if (CollectionUtils.isEmpty(results)) {
results = new ArrayList<>();
} else {
results.clear();
}
}
protected void fetchQuery(JPQLQuery<T> query) {
if (!transacted) {
List<T> queryResult = query.fetch();
for (T entity : queryResult) {
entityManager.detach(entity);
results.add(entity);
}
} else {
results.addAll(query.fetch());
}
}
@Override
protected void doClose() throws Exception {
entityManager.close();
super.doClose();
}
}
우선, Querydsl에서 람다 표현식으로 쿼리를 받기 위해 Function<JPAQueryFactory, JPAQuery<T>>
를 사용하였고, 이를 통해 JPAQueryFactory를 JPAQuery로 만들 수 있도록 하여 Reader에서 람다 표현식으로 원하는 쿼리를 만들 수 있도록 되어있다.
이제 이어서 메소드를 살펴보자.
doOpen()
@Override
protected void doOpen() throws Exception {
super.doOpen();
entityManager = entityManagerFactory.createEntityManager(jpaPropertyMap);
if (entityManager == null) {
throw new DataAccessResourceFailureException("Unable to obtain an EntityManager");
}
}
doOpen()
을 호출하면서 EntityManager가 세팅된다.doReadPage()
@Override
protected void doReadPage() {
clearIfTransacted();
JPAQuery<T> query = createQuery()
.offset(getPage() * getPageSize())
.limit(getPageSize());
initResults();
fetchQuery(query);
}
protected void clearIfTransacted() {
if (transacted) {
entityManager.clear();
}
}
protected JPAQuery<T> createQuery() {
JPAQueryFactory queryFactory = new JPAQueryFactory(entityManager);
return queryFunction.apply(queryFactory);
}
protected void initResults() {
if (CollectionUtils.isEmpty(results)) {
results = new ArrayList<>();
} else {
results.clear();
}
}
protected void fetchQuery(JPQLQuery<T> query) {
if (!transacted) {
List<T> queryResult = query.fetch();
for (T entity : queryResult) {
entityManager.detach(entity);
results.add(entity);
}
} else {
results.addAll(query.fetch());
}
}
이어서 doReadPage()
가 수행되는데, 현재의 페이지 * pageSize 만큼을 offset으로 하며 pageSize만큼만 조회한다.
그리고, clearIfTransacted()
가 뭔가 허전한 이유는 아래 자료를 참고하면 된다.
https://jojoldu.tistory.com/414 (기존의 코드는 fetchJoin이 적용되지 않아 n+1 문제가 발생한다고 한다.)