포스트

Stream 이란?

Stream 이란?

배경 : Stream은 왜 탄생했을까?

Stream API란?

Stream은 데이터의 흐름(데이터 파이프라인)을 선언적으로 처리하는 API다

  • Collection에 저장된 데이터에 대해
  • 반복, 필터링, 매핑, 집계 등을
  • 함수형 프로그래밍 방식으로 처리

전통적인 컬렉션처리 vs Stream API

Java 8이전에는 반복문 또는 Iterator를 통해 데이터를 처리했다

1
2
3
4
5
6
7
8
List<User> users = ...;
List<String> names = new ArrayList<>();

for (User user : users) {
    if (user.getAge() >= 30) {
        names.add(user.getName());
    }
}

문제점

  1. 반복문 + 조건문 + 수집 코드가 뒤섞여 있음

    → “무엇을” 하는지가 “어떻게” 구현됐는지에 묻힘

  2. 병렬 처리가 어렵고, 가독성이 떨어짐

    → 10줄 이상 걸리는 것도 많음

  3. 중간 가공을 추상화하기 힘듦

    → 메서드 체이닝이 안되니 재사용 어렵고 유지보수도 번거로움

Stream 방식의 변화

1
2
3
4
List<String> names = users.stream()
		.filter(user -> user.getAge() >= 30) // 조건 필터
		.map(User::getName)                  // 가공
		.collect(Collectors.toList());       // 수집

장점

  • “무엇을 할지”에 집중 (어떻게는 stream이 처리함)
  • 중간 가공 단게를 연속적으로 조립 가능
  • 3~4줄에 전체흐름이 요약됨
  • .parallelStream()으로 쉽게 병렬화 가능
  • 불변성 보장
    • 원본 데이터를 변경하지 않고 새 컬렉션 생성
  • 지연평가 지원

지연평가 (Lazy Evaluation)


지연평가란?

  • 지연평가란 스트림의 요소에 대한 연산이 실제로 필요할 때만 수행되는 방식을 의미한다. 보통 이는 최종 연산(Terminal Operaiton)이 호출되는 시점에 발생한다. 이는 연산이 곧바로 수행되는 즉시 평가(Eager Evaluation)과는 대조적인 개념이다.
  • Java Stream에서는 중간연산이 최종연산이 호출되기전까지 실행되지 않는다. 이러한 방식은 반복 횟수와 계산량을 줄일 수 있기 때문에, 특히 대용량 데이터셋을 처리할 때 성능을 최적화하는 데 효과적이다.

원리

  1. 스트림 초기화
    • stream()과 같이 컬렉션에서 스트림을 생성하면, JVM은 스트림의 초기 수정을 설정한다. 이 때 스트림은 소스 데이터(예: 컬렉션이나 배열)에 대한 참조를 저장한다.
  2. 중간연산
    • filter, map등의 중간 연산을 체이닝하면, JVM은 각각의 연산에 대해 새로운 스트림 객체를 생성하여 이전 스트림과 연결한다. 하지만 이 단계에서는 실제 계산이 전혀 이루어지지 않는다.
    • 즉 연산을 “등록”만 할 뿐, 실행은 미뤄둔다.
  3. 최종 연산 실행
    • 지연 평가는 최종 연산이 호출될 때 비로소 작동한다.
    • JVM은 소스부터 최종 연산까지 파이프라인을 따라 연산들을 적용하며 전체 스트림 처리를 시작한다.
  4. 요소 처리 방식
    • JVM은 성능과 메모리 사용을 최적화하기 위해 데이터를 한 요소씩 가져와 처리한다.
    • 이 방식은 특히 큰 컬렉션을 다룰 때 불필요한 데이터를 메모리에 올리지 않아도 되므로 효율적이다.
  5. 단락(short-circuit) 연산 지원
    • findFirst, limit과 같이 “조건을 만족하면 즉시 처리를 멈추는 연산(단락 연산)”의 경우, JVM은 조건이 충족되면 더 이상 데이터를 처리하지 않는다.
    • 이 덕분에 불필요한 계산을 줄일 수 있다.

예제


예제 1 : 기본적인 지연 연산 예시

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
List<String> strings = Arrays.asList("one", "two", "three", "four");

Stream<String> longStringsStream = strings.stream().filter(s -> {
    System.out.println("Filtering: " + s);
    return s.length() > 3;
});

System.out.println("Stream created, filter not applied yet!");

longStringsStream.forEach(System.out::println);

/**
 * 결과
 * 
 * Stream created, filter not applied yet!
 * Filtering: one
 * Filtering: two
 * Filtering: three
 * three
 * Filtering: four
 * four
 */

🔎 설명 :

  • filter는 중간 연산으로 아직 실행되지 않음
  • forEach가 호출되는 최종 연산 시점에 그제서야 filter가 실행됨
  • 따라서 “Filtering: one 같은 로그는 forEach가 실행될때 출력됨

예제 2 : 여러 중간 연산 결합

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
List<String> strings = Arrays.asList("one", "two", "three", "four");

strings.stream()
      .filter(s -> {
          System.out.println("Filter: " + s);
          return s.length() > 3;
      })
      .map(s -> {
          System.out.println("Map: " + s);
          return s.toUpperCase();
      })
.forEach(s -> System.out.println("Processed: " + s));

/**
 * 결과
 *
 * Filter: one
 * Filter: two
 * Filter: three
 * Map: three
 * Processed: THREE
 * Filter: four
 * Map: four
 * Processed: FOUR
 */

🔎 설명 :

  • 이 스트림은 각 요소에 대해 filtermapforEach 순서로 동작
  • 하지만 최종 연산인 forEach가 실행되기 전까지는 어떤 연산도 실행되지 않음
  • 한 요소가 filter를 통과하면 바로 mapforEach로 처리됨 (한 번에 흐름 처리)

예제 3 : 무한 스트림

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Stream.iterate(0, n -> n + 1)
        .filter(n -> n % 2 == 0)
        .limit(10)
        .forEach(System.out::println);

/**
 * 결과
 * 
 * 0
 * 2
 * 4
 * 6
 * 8
 * 10
 * 12
 * 14
 * 16
 * 18
 */

🔎 설명 :

  • Stream.iterate는 무한 스트림을 생성함(0, 1, 2, 3, …)
  • 하지만 limit(10) 덕분에 처리는 처음 10개의 짝수까지만 진행
  • 이 역시 지원 평가 덕분에 메모리 폭발 없이 작동 가능

예제 4 : 최종 연산 (실행 트리거)

  • 지연 평가는 최종 연산이 호출될 때까지 계속 유지된다.
  • 최종 연산은 데이터를 실제로 처리하게 만드는 연산을 의미하며, 대표적으로 collect, forEach, reduce 등이 있다.
  • 최종 연산이 호출되면 다음과 같은 일이 발생한다.
    • JVM은 소스 데이터(예: 숫자 리스트)를 순회하기 시작한다.
    • 등록된 중간 연산(filter, map등)을 지정된 순서대로 하나씩 적용한다.
    • 그 결과가 계산되어 반환되거나, 최종 연산에서 지정된 동작이 수행된다.
1
List<Integer> result = filterdStream.collect(Collectors.toList());

위의 예제의 collect는 파이프라인 실행을 트리거하는 최종 연산이다. JVM은 소스 리스트를 순회하며 filter, map 등의 중간 연산을 적용하고, 결과를 새 리스트에 모은다.

예제 5 : 단락(Short-Circuition) 연산

  • Stream은 단락연산도 지원한다.
  • 이러한 연산은 특정 조건이 충족되면 더 이상의 처리를 중단한다.
  • 예를 들어 findFirst, findAny, limit 등이 이에 해당한다.
1
2
3
Optional<Integer> firstEven = numbers.stream
																	.filter(n -> n % 2 == 0)
																	.findFirst();

이 경우, 짝수를 처음 찾는 순간 스트림 처리가 즉시 멈춘다.

이는 불필요한 계산을 줄여 성능을 향상시킨다.

예제 6 : 병렬처리

  • Java의 Stream은 .parallelStream()을 통해 간단히 병렬 처리를 구현할 수 있다.
  • 하지만 병렬 처리가 항상 이점을 주는 것은 아니다.

아래는 JMH를 사용하여 stream()parallelStream()의 성능을 비교한 코드이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package benchmark;

import org.openjdk.jmh.annotations.*;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

@Fork(1)
@Warmup(iterations = 2)
@BenchmarkMode(Mode.AverageTime)
@State(Scope.Thread)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public class MyFirstBenchmark {

    private List<Integer > numbers;

    @Param({"1000", "1000000", "10000000"})
    private int size;

    @Setup(Level.Iteration)
    public void setUp(){
        numbers = IntStream.range(0, size)
                .boxed()
                .collect(Collectors.toList());
    }
    @Benchmark
    public long countEven_stream(){
        return numbers.stream()
                .filter(n -> n % 2 == 0)
                .count();
    }

    @Benchmark
    public long countEven_parallelStream() {
        return numbers.parallelStream()
                .filter(n -> n % 2 == 0)
                .count();
    }

    public static void main(String[] args) throws IOException {
        org.openjdk.jmh.Main.main(args);
  
/**
 * 결과
 * 
 * Benchmark                                    (size)  Mode  Cnt     Score      Error  Units
 * MyFirstBenchmark.countEven_parallelStream      1000  avgt    5    11.561 ±    1.398  us/op
 * MyFirstBenchmark.countEven_parallelStream   1000000  avgt    5   363.578 ±  113.900  us/op
 * MyFirstBenchmark.countEven_parallelStream  10000000  avgt    5  3719.376 ±  997.688  us/op
 * MyFirstBenchmark.countEven_stream              1000  avgt    5     0.512 ±    0.022  us/op
 * MyFirstBenchmark.countEven_stream           1000000  avgt    5   573.901 ±  111.868  us/op
 * MyFirstBenchmark.countEven_stream          10000000  avgt    5  7481.977 ± 1153.888  us/op
 */
    }
}

분석

  • 데이터가 적을 경우 (1,000)에는 parallelStream()이 오히려 성능이 더 나쁘다.
  • 중간 크기에서는 (1,000,000) 성능 차이가 미미하거나 오히려 병렬이 느릴 수 있다.
  • 데이터가 매우 클 경우 (10,000,000)에만 병렬 처리가 이점을 보인다.

결론

병렬 스트림은 다음과 같은 조건에서 사용하는 것이 효과적이다.

  • 데이터의 크기가 충분히 클 때
  • 각 연산이 CPU 자원을 많이 사용하는 복잡한 작업일 때
  • 순서에 의존하지 않는 연산일 때
  • 병렬 처리가 자연스럽게 분할 가능한 작업일 때
이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.