훈훈훈
Spring Data :: Spring Data Elasticsearch rollover index 정리 본문
Spring Data :: Spring Data Elasticsearch rollover index 정리
훈훈훈 2021. 11. 26. 01:23Introduction
이번에 HighLevelRestClient 를 사용하여 운영하던 Indexer 어플리케이션에 Spring Data Elasticsearch 를 적용 해보게 되었다.
새로운 라이브러리를 적용 후 Alias 를 사용한 Rollover Index 기능 구현은 프레임워크 버전에 따라 구현 방법이 조금씩 다른 것 같아 적절한 레퍼런스를 찾기가 좀 어려웠었다.
다행히 스택오버플로우를 보다가 Spring Data Elasticsearch 프로젝트 리드하시는 분의 블로그에 좋은 글을 찾아 쉽게 해결할 수 있었고, 관련해서 내용을 정리해보려고 한다.
에제 코드는 Github 에서 확인해볼 수 있다.
Rollover Index
위 그림을 보듯이 요청을 alias 이름으로 받고 데이터는 alias 와 mapping 되어 있는 index 를 통해 데이터를 요청하는 것을 볼 수 있다.
따라서 클라이언트는 ES 에 존재하는 여러 index 의 이름을 알 필요는 없고 alias 를 통해 데이터를 받아오며, 실질적으로 gateway 역할을 한다고 볼 수 있다.
batch 등을 통해 데이터를 가공하여 index 를 새로 생성한다면 시간 값 등을 index 이름 뒤에 붙여주는 방식으로 index 를 생성하면 된다.
그리고 Alias 를 매핑해주면 된다.
Spring Data Elasticsearch
Spring Data Elasitcsearch 는 4.0 버전으로 올라오면서 많은 부분들이 변한 것 같다.
눈에 띄는 것은 ElasticsearchOperations 인터페이스이다. 해당 인터페이스를 호출하여 Elasticsearch 에 요청을 간편하게 보낼 수 있다.
이전에는 직접 생성한 쿼리를 HighLevelRestClient 를 사용하여 요청 보냈지만, ElasticsearchOperations 은 이를 wrapping 하여 간편하게 사용할 수 있도록 도와준다.
그리고 ElasticsearchOperations 인터페이스에서 사용하는 IndexOperations 인터페이스를 사용하면 인덱스 생성, 삭제, Alias 설정 등 다양한 기능을 지원해준다. 아래 이미지는 해당 인터페이스의 일부분만 갭쳐하였고, 실제로 확인해보면 더 많은 메소드들을 볼 수 있다.
그리고 Spring Data Elasitcsearch 4.1 버전에서도 바뀐 부분이 있는데 일단 IndexTemplate 을 지원하기 시작했다.
그리고 이번에 사용할 Alias 도 변경된 부분이 있는데, 기존에 사용한 AliasQuery 은 Deprecated 되고 AliasAction 을 사용하도록 권장하고 있다. (왜 AliasQuery 가 Deprecated 가 되었는지는 좀 더 찾아봐야될 것 같다....)
그리고 여러 메서드들이 추가가되었는데, 이번 예제에서 사용할 메서드인 alias( ), 그리고 getAliasesForIndex( ) 가 추가되었다.
Code Example
이제 코드 예제를 살펴보자.
먼저 아래와 같이 의존성을 추가한다.
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-data-elasticsearch'
}
Spring boot 는 2.6.0 버전을 사용하였을 때, 자동적으로 Spring data elasticsearch 는 4.3 버전이 추가되었다.
혹여나 Spring boot 버전이 낮더라도 2.3 버전 이상이면 starter-data-elasticsearch 가 아닌 직접 의존성을 추가하면 spring data elasticsearch 4.3 까지 적용이 되었다.
의존성이 정상적으로 적용이 된 이후에 아래와 같이 RestHighLevelClient 와 ElasticsearchOperations 를 Bean 으로 등록한다.
해당 코드는 Bealdung 글을 참고하였다.
@Configuration
@EnableElasticsearchRepositories(basePackages = "com.hooon.dataes.repository")
@ComponentScan(basePackages = { "com.hooon.dataes.service" })
public class ElasticSearchConfig {
@Bean
public RestHighLevelClient client() {
ClientConfiguration clientConfiguration = ClientConfiguration.builder()
.connectedTo("localhost:9200")
.build();
return RestClients.create(clientConfiguration).rest();
}
@Bean
public ElasticsearchOperations elasticsearchTemplate() {
return new ElasticsearchRestTemplate(client());
}
}
예제로 사용할 아래 User Entity 클래스는 간단하게 필드 두 개로 만들었다.
@Entity
@Getter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@Table(name = "users")
public class User {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = "id", nullable = false)
private Long id;
@Column(name = "name", nullable = false, length = 10)
private String name;
@Builder
public User(String name) {
this.name = name;
}
}
conertDocument 메서드는 아래에서 확인할 도큐먼트 클래스로 캐스팅하는 메소드이다.
실제로 필드만 매핑하는 거라면 modelMapper 로 대체하여도 무방하다.
이제 index 에 저장할 Doucument 객체를 만들자.
@Getter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@Document(indexName = "user", createIndex = false)
public class UserDocument {
@Id
private String id;
@Field(type = FieldType.Text)
private String name;
}
@Document 어노테이션에는 생성할 인덱스 이름인 user 를 적었지만, 실제로는 index 이름이 아닌 alias 이름으로 사용할 것이다.
index 이름은 user-1637745625207, user-1637745625210 등 user 를 prefix 로 사용하려고 한다.
다시 정리하자면, user 라는 이름으로 index 를 생성하지 않을 것이며 그렇기 때문에 어플리케이션이 실행될 때 자동으로 인덱스를 생성하는 옵션인 createIndex 옵션은 false 로 설정한다.
아래 클래스는 인덱스 객체를 생성하는 유틸클래스이다.
public class IndexUtil {
public static IndexCoordinates createIndexNameWithPostFixWrapper(String indexName) {
return IndexCoordinates.of(indexName + "-" + Instant.now().toEpochMilli());
}
public static IndexCoordinates createIndexNameWrapper(String indexName) {
return IndexCoordinates.of(indexName);
}
}
인덱스 이름을 받으면 그대로 인덱스 객체를 생성하는 메서드와 뒤에 epochMilli 를 붙여주는 메서드 두 가지를 만들었다.
epochMilli 를 붙여주는 이유는 생성되는 인덱스마다 이름이 중복이 되지 않기 위함이다. 그래야 색인을 할 때마다 새로운 인덱스를 만들며 기존 인덱스랑 이름이 중복이 되지 않기 때문이다.
이제 공통으로 사용할 레포지토리를 만들어 보자.
@Repository
public interface BaseElasticSearchRepository<T> {
<S extends T> S save(S entity, IndexCoordinates indexName);
<S extends T> Iterable<S> saveAll(Iterable<S> entities, IndexCoordinates indexName);
boolean setAlias(IndexCoordinates indexNameWrapper, IndexCoordinates aliasNameWrapper);
Set<String> findIndexNamesByAlias(IndexCoordinates aliasNameWrapper);
boolean deleteIndex(IndexCoordinates indexNameWrapper);
}
위 예제는 해당 블로그에 있는 예제를 참고하였다. 참고한 예제는 하나의 Document 에 대한 케이스로 설명하고 있지만, 어플리케이션에 존재하는 모든 Document 객체에서 공통적으로 사용할 수 있는 기능이기 때문에 BaseElasticSearchRepository 로 네이밍하였다.
위 처럼 레포지토리 인터페이스를 생성한 목적은 기존에 존재하는 CrudRepository 에 정의되어 있는 save 메서드를 호출하면 @Document 에 설정한 index 이름에 저장을 한다.
하지만 우리가 필요한 것은 index + XXX 에 저장하는 것이 목표이기 때문에 새로운 레포지토리 인터페이스를 만들어야 한다.
아래와 같이 구현하였지만, alias 세팅, alias 로 연결 된 인덱스 찾기, 인덱스 삭제 같은 기능들도 공통적으로 사용할 것 같아 추가적으로 더 정의하였다.
아래 코드는 위에 작성한 인터페이스의 구현체이다.
@Repository
@RequiredArgsConstructor
public class BaseElasticSearchRepositoryImpl<T> implements BaseElasticSearchRepository<T> {
private final ElasticsearchOperations operations;
@Override
public <S extends T> S save(S entity, IndexCoordinates indexName) {
return operations.save(entity, indexName);
}
@Override
public <S extends T> Iterable<S> saveAll(Iterable<S> entities, IndexCoordinates indexName) {
return operations.save(entities, indexName);
}
@Override
public boolean setAlias(IndexCoordinates indexNameWrapper, IndexCoordinates aliasNameWrapper) {
IndexOperations indexOperations = operations.indexOps(indexNameWrapper);
AliasActions aliasActions = new AliasActions();
aliasActions.add(new AliasAction.Add(AliasActionParameters.builder()
.withIndices(indexOperations.getIndexCoordinates().getIndexNames())
.withAliases(aliasNameWrapper.getIndexName())
.build()));
return indexOperations.alias(aliasActions);
}
@Override
public Set<String> findIndexNamesByAlias(IndexCoordinates aliasNameWrapper) {
IndexOperations indexOperations = operations.indexOps(aliasNameWrapper);
return indexOperations.getAliasesForIndex(aliasNameWrapper.getIndexName()).keySet();
}
@Override
public boolean deleteIndex(IndexCoordinates indexNameWrapper) {
IndexOperations indexOperations = operations.indexOps(indexNameWrapper);
return indexOperations.delete();
}
}
Spring data elasticserach 4.0 버전에 추가 된 ElasticsearchOpertions 를 사용하여 로직을 구현하였다.
위에서도 언급하였지만, ElasticsearchOpertions 을 사용하면 HighLevelRestClient 로 직접 request 를 보내지 않고 간단하게 메서드만 호출하는 방법으로 Elasticsearch 에 요청을 보낼 수 있다.
색인만 할 때는 아래 인터페이스가 필요 없을 수도 있지만, 추후에 조회 기능도 함께 사용할 수 있도록 ElasticsearchRepository 그리고 위에서 생성한 인터페이스를 확장한 레포지토리 인터페이스를 만들었다.
@Repository
public interface UserDocumentRepository extends
ElasticsearchRepository<UserDocument, String>,
BaseElasticSearchRepository<UserDocument> {
List<UserDocument> findUserDocumentByName(String name);
}
해당 인터페이스로 서비스 레이어에서 저 인터페이스 하나로 위에서 정의한 BaseElasticSearchRepository 에 있는 메서드들과 findBy ~~ 와 같은 Spring data 프로젝트에서 지원하는 메서드 둘 다 사용할 수 있게 되었다.
이제 색인하는 로직을 살펴보자.
@Service
@RequiredArgsConstructor
public class UserIndexingService {
private final UserRepository userRepository;
private final UserDocumentRepository userDocumentRepository;
private final ModelMapper modelMapper;
private static final String INDEX_PREFIX_NAME ="user";
private static final String ALIAS_NAME = "user";
@PostConstruct
public void indexingUserDate() {
IndexCoordinates indexNameWrapper = IndexUtil.createIndexNameWithPostFixWrapper(INDEX_PREFIX_NAME);
IndexCoordinates aliasNameWrapper = IndexUtil.createIndexNameWrapper(ALIAS_NAME);
Set<String> existIndexNames = userDocumentRepository.findIndexNamesByAlias(aliasNameWrapper);
List<User> users = userRepository.findAll();
List<UserDocument> userDocuments = users.stream()
.map(user -> modelMapper.map(user, UserDocument.class))
.collect(Collectors.toList());
userDocumentRepository.saveAll(userDocuments, indexNameWrapper);
existIndexNames.forEach(indexName -> userDocumentRepository.deleteIndex(IndexUtil.createIndexNameWrapper(indexName)));
userDocumentRepository.setAlias(indexNameWrapper, aliasNameWrapper);
}
}
색인하는 과정은 간단하게 아래와 같은 절차로 실행이 된다.
1. user alias 에 매핑된 index 이름 조회
2. DB 에 있는 user 전체 데이터 조회 후 색인
3. (1)번에서 조회한 index 삭제
4. (2) 번에서 생성한 index 에 alias 매핑
실행시키먄 아래와 같이 정상적으로 데이터가 색인이 된 것을 볼 수 있다.
** 참고
https://github.com/sothawo/blog-sde-rolling-index
https://www.elastic.co/guide/en/elasticsearch/reference/6.8/indices-rollover-index.html
https://www.baeldung.com/spring-data-elasticsearch-tutorial
'Spring Framework > 개념' 카테고리의 다른 글
Spring Data :: Spring Data Elasticsearch refresh policy 정리 (0) | 2022.01.31 |
---|---|
Spring Data :: Spring Data Elasticsearch _class 필드 자동 생성 비활성화 (0) | 2021.11.27 |
Spring Cloud :: Spring cloud sleuth 정리 (0) | 2021.11.13 |
Spring boot :: Caffeine cache 정리 (2) | 2021.10.31 |
Spring boot :: Multiple DataSource 환경에서 @DataJpaTest 이슈 정리 및 스프링 코드 분석 (4) | 2021.10.11 |