병렬 스트림
컬렉션에 paralleStream을 호출하면
//일반 스트림 처리
public long sequentialSum(long n) {
return Stream.iterate(1L, i -> i+1)
.limit(n)
.reduce(0L, Long::sum);
}
//병렬 처리
public long parallelSum(long n) {
return Stream.iterate(1L, i -> i+1)
.limit(n)
.parallel() //스트림을 병렬 스트림으로 변환
.reduce(0L, Long::sum);
}
스트림 성능 측정
병렬화를 이용하면 순차나 반복 형식에 비해 성능이 좋아질 것이라 추축되지만, 실제로는 그렇지 않다.
아래와 같이 1부터 n까지 합을 구하는 함수를 만들었다.
//전통적인 처리 : loop
public static long iterativeSum(long n) {
long result = 0;
for(long i = 1L; i<=n; i++) {
result += i;
}
return result;
}
//스트림 처리
public static long sequentialSum(long n) {
return Stream.iterate(1L, i -> i+1)
.limit(n)
.reduce(0L, Long::sum);
}
//병렬 처리
public static long parallelSum(long n) {
return Stream.iterate(1L, i -> i+1)
.limit(n)
.parallel() //스트림을 병렬 스트림으로 변환
.reduce(0L, Long::sum);
}
그리고 아래는 각 함수의 처리 속도를 측정하는 함수이다.
//성능 측정
public long mesaure(Function<Long, Long> adder, long n) {
long fastest = Long.MAX_VALUE;
for(int i = 0; i<10; i++) {
long start = System.nanoTime();
long sum = adder.apply(n);
long duration = (System.nanoTime()-start) / 1_000_000;
// System.out.println("Result: "+sum);
if(duration < fastest) fastest = duration;
}
return fastest;
}
@Test
public void test() {
System.out.println(mesaure(ParallTest::iterativeSum, 10_000_000)+" msecs");
System.out.println(mesaure(ParallTest::sequentialSum, 10_000_000)+" msecs");
System.out.println(mesaure(ParallTest::parallelSum, 10_000_000)+" msecs");
}
실행해 본 결과는 다음과 같다.
3 msecs //loop
100 msecs //스트림
78 msecs //병렬 스트림
기대한 바와는 다르게 for 루프가 가장 빠르다. for 루프는 기본값을 박싱하거나 언박싱할 필요가 없으므로 수행속도가 빠르다.
책(자바인액션)의 결과와는 다르게 나의 경우 병렬 스트림 처리가 일반 스트림 처리 보다 더 수행속도가 빠른 것으로 나왔다.
하지만 이런 경우 병렬 스트림처리가 느리다고 소개되어 있으니 그 이유를 정리 하고 가겠다.
- Stream.iterate가 박싱된 객체를 생성하므로 이를 다시 언박싱하는 과정이 필요했다.
- Stream.iterate는 병렬로 실행될 수 있도록 독립적인 청크로 분할하기가 어렵다.
두번째 이유를 자세히 설명하자면,
- 스트림 연산은 최종 연산이 진행되기 전까지는 연산을 미루는 Lazy 연산을 한다.
- 따라서 위 병렬처리 함수(parallelSum)로직에서 iterate함수가 stream내에서 이루어 짐으로 .reduce가 실행되기 전까지 전체 숫자 리스트가 준비 되지 않는다.
- 전체 숫자 리스트가 준비되지 않았으므로 스트림을 병렬로 처리할 수 있도록 청크로 분할할 수 없다.
스트림이 병렬로 처리되도록 지시했고 각각의 합계가 다른 스레드에서 수행되었지만 결국 순차처리 방식과 크게 다른 점이 없으므로 스레드를 할당하는 오버헤드만 증가하게 된다.
다음은 이러한 문제점을 개선한 함수이다.
//개선된 처리
public static long paralleRangeSum(long n) {
return LongStream.rangeClosed(1, n)
.parallel()
.reduce(0L, Long::sum);
}
처리 결과는 다음과 같이 가장 빨랐다.
@Test
public void test1() {
System.out.println(mesaure(ParallTest::paralleRangeSum, 10_000_000)+" msecs");
}
1 msecs
빨라진 이유는 다음과 같은 이유에서다.
- long타입에 특화된 LongStream을 사용함으로서 박싱과 언박싱 오버헤드가 사라진다.
- LongStream.rangeClosed는 쉽게 청크로 분할할 수 있도록 숫자 범위가 정해진다.
결론
병렬화를 이용하려면 스트림을 재귀적으로 분할해야 하고, 각 서브스트림의 연산결과를 하나의 값으로 합쳐야 한다.
멀티코어간의 데이터 이동은 우리 생각보다 비싸서 데이터 전송 시간보다 훨씬 오래 걸리는 작업만 병렬화 하는 것이 좋다.
- 확신이 서지 않는다면 직접 측정하라.
- 박싱 : 자동 박싱과 언박싱은 성능을 크게 저하시킬 수 있는 요소다. 되도록이면 기본형 특화 스트림을 사용하는 것이 좋다.
- 순차 스트림보다 병렬 스트림에서 성능이 떨어지는 연산이 있다. - 특히 limit이나 findFirst처럼 요소의 순서에 의존하는
연산을 병렬 스트림에서 수행하려면 비싼 비용을 치러야 한다. 예를 들어 findAny는 요소의 순서와 상관 없이 연산하므로
findFisrt보다 성능이 좋다. - 스트림에서 수행하는 전체 파이프라인 연산비용을 고려해라. 하나의 요소를 처리하는데 드는 비용이 커지면 커질 수록 병렬 스트림 으로 성능을 개선할 수 있는 가능성이 있음을 의미한다.
- 소량의 데이터에서는 병렬 스트림이 도움 되지 않는다. 소량의 데이터를 처리하는 상황에서는 병렬화 과정에서 생기는 부가 비용을 상쇄할 수 있을 만큼의 이득을 얻지 못하기 때문이다.
- 스트림을 구성하는 자료구조가 적절한지 확인하라.
예를 들면 ArrayList가 LinkedList보다 분할하기 낫고, range팩토리 메서드로 만든 기본형 스트림을 쓰는 것이 좋다. - 스트림의 중간연산이 스트림의 특성을 어떻게 바꾸는지도 생각해봐야 한다. filter연산이 있는 경우 스트림의 길이를 예측 할 수 없으므로 효과적으로 스트림을 병렬 처리 할 수 있을지 알 수 없게 된다.
- 최종 연산의 병합과정(예를 들면 Collector의 combiner 메서드) 비용을 살펴 보라. 병합 과정의 비용이 비싸다면
병렬 스트림으로 얻은 성능의 이익이 서브스트림의 부분결과를 합치는 과정에서 상쇄될 수 있다.