WhileOps.java revision 12745:f068a4ffddd2
1/* 2 * Copyright (c) 2015, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25package java.util.stream; 26 27import java.util.Comparator; 28import java.util.Objects; 29import java.util.Spliterator; 30import java.util.concurrent.CountedCompleter; 31import java.util.concurrent.atomic.AtomicBoolean; 32import java.util.function.Consumer; 33import java.util.function.DoubleConsumer; 34import java.util.function.DoublePredicate; 35import java.util.function.IntConsumer; 36import java.util.function.IntFunction; 37import java.util.function.IntPredicate; 38import java.util.function.LongConsumer; 39import java.util.function.LongPredicate; 40import java.util.function.Predicate; 41 42/** 43 * Factory for instances of a takeWhile and dropWhile operations 44 * that produce subsequences of their input stream. 45 * 46 * @since 1.9 47 */ 48final class WhileOps { 49 50 static final int TAKE_FLAGS = StreamOpFlag.NOT_SIZED | StreamOpFlag.IS_SHORT_CIRCUIT; 51 52 static final int DROP_FLAGS = StreamOpFlag.NOT_SIZED; 53 54 /** 55 * Appends a "takeWhile" operation to the provided Stream. 56 * 57 * @param <T> the type of both input and output elements 58 * @param upstream a reference stream with element type T 59 * @param predicate the predicate that returns false to halt taking. 60 */ 61 static <T> Stream<T> makeTakeWhileRef(AbstractPipeline<?, T, ?> upstream, 62 Predicate<? super T> predicate) { 63 Objects.requireNonNull(predicate); 64 return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE, TAKE_FLAGS) { 65 @Override 66 <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, 67 Spliterator<P_IN> spliterator) { 68 if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 69 return opEvaluateParallel(helper, spliterator, Nodes.castingArray()) 70 .spliterator(); 71 } 72 else { 73 return new UnorderedWhileSpliterator.OfRef.Taking<>( 74 helper.wrapSpliterator(spliterator), false, predicate); 75 } 76 } 77 78 @Override 79 <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper, 80 Spliterator<P_IN> spliterator, 81 IntFunction<T[]> generator) { 82 return new TakeWhileTask<>(this, helper, spliterator, generator) 83 .invoke(); 84 } 85 86 @Override 87 Sink<T> opWrapSink(int flags, Sink<T> sink) { 88 return new Sink.ChainedReference<T, T>(sink) { 89 boolean take = true; 90 91 @Override 92 public void begin(long size) { 93 downstream.begin(-1); 94 } 95 96 @Override 97 public void accept(T t) { 98 if (take = predicate.test(t)) { 99 downstream.accept(t); 100 } 101 } 102 103 @Override 104 public boolean cancellationRequested() { 105 return !take || downstream.cancellationRequested(); 106 } 107 }; 108 } 109 }; 110 } 111 112 /** 113 * Appends a "takeWhile" operation to the provided IntStream. 114 * 115 * @param upstream a reference stream with element type T 116 * @param predicate the predicate that returns false to halt taking. 117 */ 118 static IntStream makeTakeWhileInt(AbstractPipeline<?, Integer, ?> upstream, 119 IntPredicate predicate) { 120 Objects.requireNonNull(predicate); 121 return new IntPipeline.StatefulOp<Integer>(upstream, StreamShape.INT_VALUE, TAKE_FLAGS) { 122 @Override 123 <P_IN> Spliterator<Integer> opEvaluateParallelLazy(PipelineHelper<Integer> helper, 124 Spliterator<P_IN> spliterator) { 125 if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 126 return opEvaluateParallel(helper, spliterator, Integer[]::new) 127 .spliterator(); 128 } 129 else { 130 return new UnorderedWhileSpliterator.OfInt.Taking( 131 (Spliterator.OfInt) helper.wrapSpliterator(spliterator), false, predicate); 132 } 133 } 134 135 @Override 136 <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper, 137 Spliterator<P_IN> spliterator, 138 IntFunction<Integer[]> generator) { 139 return new TakeWhileTask<>(this, helper, spliterator, generator) 140 .invoke(); 141 } 142 143 @Override 144 Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 145 return new Sink.ChainedInt<Integer>(sink) { 146 boolean take = true; 147 148 @Override 149 public void begin(long size) { 150 downstream.begin(-1); 151 } 152 153 @Override 154 public void accept(int t) { 155 if (take = predicate.test(t)) { 156 downstream.accept(t); 157 } 158 } 159 160 @Override 161 public boolean cancellationRequested() { 162 return !take || downstream.cancellationRequested(); 163 } 164 }; 165 } 166 }; 167 } 168 169 /** 170 * Appends a "takeWhile" operation to the provided LongStream. 171 * 172 * @param upstream a reference stream with element type T 173 * @param predicate the predicate that returns false to halt taking. 174 */ 175 static LongStream makeTakeWhileLong(AbstractPipeline<?, Long, ?> upstream, 176 LongPredicate predicate) { 177 Objects.requireNonNull(predicate); 178 return new LongPipeline.StatefulOp<Long>(upstream, StreamShape.LONG_VALUE, TAKE_FLAGS) { 179 @Override 180 <P_IN> Spliterator<Long> opEvaluateParallelLazy(PipelineHelper<Long> helper, 181 Spliterator<P_IN> spliterator) { 182 if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 183 return opEvaluateParallel(helper, spliterator, Long[]::new) 184 .spliterator(); 185 } 186 else { 187 return new UnorderedWhileSpliterator.OfLong.Taking( 188 (Spliterator.OfLong) helper.wrapSpliterator(spliterator), false, predicate); 189 } 190 } 191 192 @Override 193 <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper, 194 Spliterator<P_IN> spliterator, 195 IntFunction<Long[]> generator) { 196 return new TakeWhileTask<>(this, helper, spliterator, generator) 197 .invoke(); 198 } 199 200 @Override 201 Sink<Long> opWrapSink(int flags, Sink<Long> sink) { 202 return new Sink.ChainedLong<Long>(sink) { 203 boolean take = true; 204 205 @Override 206 public void begin(long size) { 207 downstream.begin(-1); 208 } 209 210 @Override 211 public void accept(long t) { 212 if (take = predicate.test(t)) { 213 downstream.accept(t); 214 } 215 } 216 217 @Override 218 public boolean cancellationRequested() { 219 return !take || downstream.cancellationRequested(); 220 } 221 }; 222 } 223 }; 224 } 225 226 /** 227 * Appends a "takeWhile" operation to the provided DoubleStream. 228 * 229 * @param upstream a reference stream with element type T 230 * @param predicate the predicate that returns false to halt taking. 231 */ 232 static DoubleStream makeTakeWhileDouble(AbstractPipeline<?, Double, ?> upstream, 233 DoublePredicate predicate) { 234 Objects.requireNonNull(predicate); 235 return new DoublePipeline.StatefulOp<Double>(upstream, StreamShape.DOUBLE_VALUE, TAKE_FLAGS) { 236 @Override 237 <P_IN> Spliterator<Double> opEvaluateParallelLazy(PipelineHelper<Double> helper, 238 Spliterator<P_IN> spliterator) { 239 if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 240 return opEvaluateParallel(helper, spliterator, Double[]::new) 241 .spliterator(); 242 } 243 else { 244 return new UnorderedWhileSpliterator.OfDouble.Taking( 245 (Spliterator.OfDouble) helper.wrapSpliterator(spliterator), false, predicate); 246 } 247 } 248 249 @Override 250 <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper, 251 Spliterator<P_IN> spliterator, 252 IntFunction<Double[]> generator) { 253 return new TakeWhileTask<>(this, helper, spliterator, generator) 254 .invoke(); 255 } 256 257 @Override 258 Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 259 return new Sink.ChainedDouble<Double>(sink) { 260 boolean take = true; 261 262 @Override 263 public void begin(long size) { 264 downstream.begin(-1); 265 } 266 267 @Override 268 public void accept(double t) { 269 if (take = predicate.test(t)) { 270 downstream.accept(t); 271 } 272 } 273 274 @Override 275 public boolean cancellationRequested() { 276 return !take || downstream.cancellationRequested(); 277 } 278 }; 279 } 280 }; 281 } 282 283 /** 284 * A specialization for the dropWhile operation that controls if 285 * elements to be dropped are counted and passed downstream. 286 * <p> 287 * This specialization is utilized by the {@link TakeWhileTask} for 288 * pipelines that are ordered. In such cases elements cannot be dropped 289 * until all elements have been collected. 290 * 291 * @param <T> the type of both input and output elements 292 */ 293 interface DropWhileOp<T> { 294 /** 295 * Accepts a {@code Sink} which will receive the results of this 296 * dropWhile operation, and return a {@code DropWhileSink} which 297 * accepts 298 * elements and which performs the dropWhile operation passing the 299 * results to the provided {@code Sink}. 300 * 301 * @param sink sink to which elements should be sent after processing 302 * @param retainAndCountDroppedElements true if elements to be dropped 303 * are counted and passed to the sink, otherwise such elements 304 * are actually dropped and not passed to the sink. 305 * @return a dropWhile sink 306 */ 307 DropWhileSink<T> opWrapSink(Sink<T> sink, boolean retainAndCountDroppedElements); 308 } 309 310 /** 311 * A specialization for a dropWhile sink. 312 * 313 * @param <T> the type of both input and output elements 314 */ 315 interface DropWhileSink<T> extends Sink<T> { 316 /** 317 * @return the could of elements that would have been dropped and 318 * instead were passed downstream. 319 */ 320 long getDropCount(); 321 } 322 323 /** 324 * Appends a "dropWhile" operation to the provided Stream. 325 * 326 * @param <T> the type of both input and output elements 327 * @param upstream a reference stream with element type T 328 * @param predicate the predicate that returns false to halt dropping. 329 */ 330 static <T> Stream<T> makeDropWhileRef(AbstractPipeline<?, T, ?> upstream, 331 Predicate<? super T> predicate) { 332 Objects.requireNonNull(predicate); 333 334 class Op extends ReferencePipeline.StatefulOp<T, T> implements DropWhileOp<T> { 335 public Op(AbstractPipeline<?, T, ?> upstream, StreamShape inputShape, int opFlags) { 336 super(upstream, inputShape, opFlags); 337 } 338 339 @Override 340 <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, 341 Spliterator<P_IN> spliterator) { 342 if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 343 return opEvaluateParallel(helper, spliterator, Nodes.castingArray()) 344 .spliterator(); 345 } 346 else { 347 return new UnorderedWhileSpliterator.OfRef.Dropping<>( 348 helper.wrapSpliterator(spliterator), false, predicate); 349 } 350 } 351 352 @Override 353 <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper, 354 Spliterator<P_IN> spliterator, 355 IntFunction<T[]> generator) { 356 return new DropWhileTask<>(this, helper, spliterator, generator) 357 .invoke(); 358 } 359 360 @Override 361 Sink<T> opWrapSink(int flags, Sink<T> sink) { 362 return opWrapSink(sink, false); 363 } 364 365 public DropWhileSink<T> opWrapSink(Sink<T> sink, boolean retainAndCountDroppedElements) { 366 class OpSink extends Sink.ChainedReference<T, T> implements DropWhileSink<T> { 367 long dropCount; 368 boolean take; 369 370 OpSink() { 371 super(sink); 372 } 373 374 @Override 375 public void accept(T t) { 376 boolean takeElement = take || (take = !predicate.test(t)); 377 378 // If ordered and element is dropped increment index 379 // for possible future truncation 380 if (retainAndCountDroppedElements && !takeElement) 381 dropCount++; 382 383 // If ordered need to process element, otherwise 384 // skip if element is dropped 385 if (retainAndCountDroppedElements || takeElement) 386 downstream.accept(t); 387 } 388 389 @Override 390 public long getDropCount() { 391 return dropCount; 392 } 393 } 394 return new OpSink(); 395 } 396 } 397 return new Op(upstream, StreamShape.REFERENCE, DROP_FLAGS); 398 } 399 400 /** 401 * Appends a "dropWhile" operation to the provided IntStream. 402 * 403 * @param upstream a reference stream with element type T 404 * @param predicate the predicate that returns false to halt dropping. 405 */ 406 static IntStream makeDropWhileInt(AbstractPipeline<?, Integer, ?> upstream, 407 IntPredicate predicate) { 408 Objects.requireNonNull(predicate); 409 class Op extends IntPipeline.StatefulOp<Integer> implements DropWhileOp<Integer> { 410 public Op(AbstractPipeline<?, Integer, ?> upstream, StreamShape inputShape, int opFlags) { 411 super(upstream, inputShape, opFlags); 412 } 413 414 @Override 415 <P_IN> Spliterator<Integer> opEvaluateParallelLazy(PipelineHelper<Integer> helper, 416 Spliterator<P_IN> spliterator) { 417 if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 418 return opEvaluateParallel(helper, spliterator, Integer[]::new) 419 .spliterator(); 420 } 421 else { 422 return new UnorderedWhileSpliterator.OfInt.Dropping( 423 (Spliterator.OfInt) helper.wrapSpliterator(spliterator), false, predicate); 424 } 425 } 426 427 @Override 428 <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper, 429 Spliterator<P_IN> spliterator, 430 IntFunction<Integer[]> generator) { 431 return new DropWhileTask<>(this, helper, spliterator, generator) 432 .invoke(); 433 } 434 435 @Override 436 Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 437 return opWrapSink(sink, false); 438 } 439 440 public DropWhileSink<Integer> opWrapSink(Sink<Integer> sink, boolean retainAndCountDroppedElements) { 441 class OpSink extends Sink.ChainedInt<Integer> implements DropWhileSink<Integer> { 442 long dropCount; 443 boolean take; 444 445 OpSink() { 446 super(sink); 447 } 448 449 @Override 450 public void accept(int t) { 451 boolean takeElement = take || (take = !predicate.test(t)); 452 453 // If ordered and element is dropped increment index 454 // for possible future truncation 455 if (retainAndCountDroppedElements && !takeElement) 456 dropCount++; 457 458 // If ordered need to process element, otherwise 459 // skip if element is dropped 460 if (retainAndCountDroppedElements || takeElement) 461 downstream.accept(t); 462 } 463 464 @Override 465 public long getDropCount() { 466 return dropCount; 467 } 468 } 469 return new OpSink(); 470 } 471 } 472 return new Op(upstream, StreamShape.INT_VALUE, DROP_FLAGS); 473 } 474 475 /** 476 * Appends a "dropWhile" operation to the provided LongStream. 477 * 478 * @param upstream a reference stream with element type T 479 * @param predicate the predicate that returns false to halt dropping. 480 */ 481 static LongStream makeDropWhileLong(AbstractPipeline<?, Long, ?> upstream, 482 LongPredicate predicate) { 483 Objects.requireNonNull(predicate); 484 class Op extends LongPipeline.StatefulOp<Long> implements DropWhileOp<Long> { 485 public Op(AbstractPipeline<?, Long, ?> upstream, StreamShape inputShape, int opFlags) { 486 super(upstream, inputShape, opFlags); 487 } 488 489 @Override 490 <P_IN> Spliterator<Long> opEvaluateParallelLazy(PipelineHelper<Long> helper, 491 Spliterator<P_IN> spliterator) { 492 if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 493 return opEvaluateParallel(helper, spliterator, Long[]::new) 494 .spliterator(); 495 } 496 else { 497 return new UnorderedWhileSpliterator.OfLong.Dropping( 498 (Spliterator.OfLong) helper.wrapSpliterator(spliterator), false, predicate); 499 } 500 } 501 502 @Override 503 <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper, 504 Spliterator<P_IN> spliterator, 505 IntFunction<Long[]> generator) { 506 return new DropWhileTask<>(this, helper, spliterator, generator) 507 .invoke(); 508 } 509 510 @Override 511 Sink<Long> opWrapSink(int flags, Sink<Long> sink) { 512 return opWrapSink(sink, false); 513 } 514 515 public DropWhileSink<Long> opWrapSink(Sink<Long> sink, boolean retainAndCountDroppedElements) { 516 class OpSink extends Sink.ChainedLong<Long> implements DropWhileSink<Long> { 517 long dropCount; 518 boolean take; 519 520 OpSink() { 521 super(sink); 522 } 523 524 @Override 525 public void accept(long t) { 526 boolean takeElement = take || (take = !predicate.test(t)); 527 528 // If ordered and element is dropped increment index 529 // for possible future truncation 530 if (retainAndCountDroppedElements && !takeElement) 531 dropCount++; 532 533 // If ordered need to process element, otherwise 534 // skip if element is dropped 535 if (retainAndCountDroppedElements || takeElement) 536 downstream.accept(t); 537 } 538 539 @Override 540 public long getDropCount() { 541 return dropCount; 542 } 543 } 544 return new OpSink(); 545 } 546 } 547 return new Op(upstream, StreamShape.LONG_VALUE, DROP_FLAGS); 548 } 549 550 /** 551 * Appends a "dropWhile" operation to the provided DoubleStream. 552 * 553 * @param upstream a reference stream with element type T 554 * @param predicate the predicate that returns false to halt dropping. 555 */ 556 static DoubleStream makeDropWhileDouble(AbstractPipeline<?, Double, ?> upstream, 557 DoublePredicate predicate) { 558 Objects.requireNonNull(predicate); 559 class Op extends DoublePipeline.StatefulOp<Double> implements DropWhileOp<Double> { 560 public Op(AbstractPipeline<?, Double, ?> upstream, StreamShape inputShape, int opFlags) { 561 super(upstream, inputShape, opFlags); 562 } 563 564 @Override 565 <P_IN> Spliterator<Double> opEvaluateParallelLazy(PipelineHelper<Double> helper, 566 Spliterator<P_IN> spliterator) { 567 if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 568 return opEvaluateParallel(helper, spliterator, Double[]::new) 569 .spliterator(); 570 } 571 else { 572 return new UnorderedWhileSpliterator.OfDouble.Dropping( 573 (Spliterator.OfDouble) helper.wrapSpliterator(spliterator), false, predicate); 574 } 575 } 576 577 @Override 578 <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper, 579 Spliterator<P_IN> spliterator, 580 IntFunction<Double[]> generator) { 581 return new DropWhileTask<>(this, helper, spliterator, generator) 582 .invoke(); 583 } 584 585 @Override 586 Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 587 return opWrapSink(sink, false); 588 } 589 590 public DropWhileSink<Double> opWrapSink(Sink<Double> sink, boolean retainAndCountDroppedElements) { 591 class OpSink extends Sink.ChainedDouble<Double> implements DropWhileSink<Double> { 592 long dropCount; 593 boolean take; 594 595 OpSink() { 596 super(sink); 597 } 598 599 @Override 600 public void accept(double t) { 601 boolean takeElement = take || (take = !predicate.test(t)); 602 603 // If ordered and element is dropped increment index 604 // for possible future truncation 605 if (retainAndCountDroppedElements && !takeElement) 606 dropCount++; 607 608 // If ordered need to process element, otherwise 609 // skip if element is dropped 610 if (retainAndCountDroppedElements || takeElement) 611 downstream.accept(t); 612 } 613 614 @Override 615 public long getDropCount() { 616 return dropCount; 617 } 618 } 619 return new OpSink(); 620 } 621 } 622 return new Op(upstream, StreamShape.DOUBLE_VALUE, DROP_FLAGS); 623 } 624 625 // 626 627 /** 628 * A spliterator supporting takeWhile and dropWhile operations over an 629 * underlying spliterator whose covered elements have no encounter order. 630 * <p> 631 * Concrete subclasses of this spliterator support reference and primitive 632 * types for takeWhile and dropWhile. 633 * <p> 634 * For the takeWhile operation if during traversal taking completes then 635 * taking is cancelled globally for the splitting and traversal of all 636 * related spliterators. 637 * Cancellation is governed by a shared {@link AtomicBoolean} instance. A 638 * spliterator in the process of taking when cancellation occurs will also 639 * be cancelled but not necessarily immediately. To reduce contention on 640 * the {@link AtomicBoolean} instance, cancellation make be acted on after 641 * a small number of additional elements have been traversed. 642 * <p> 643 * For the dropWhile operation if during traversal dropping completes for 644 * some, but not all elements, then it is cancelled globally for the 645 * traversal of all related spliterators (splitting is not cancelled). 646 * Cancellation is governed in the same manner as for the takeWhile 647 * operation. 648 * 649 * @param <T> the type of elements returned by this spliterator 650 * @paramthe type of the spliterator 651 */ 652 abstract static class UnorderedWhileSpliterator<T, T_SPLITR extends Spliterator<T>> implements Spliterator<T> { 653 // Power of two constant minus one used for modulus of count 654 static final int CANCEL_CHECK_COUNT = (1 << 6) - 1; 655 656 // The underlying spliterator 657 final T_SPLITR s; 658 // True if no splitting should be performed, if true then 659 // this spliterator may be used for an underlying spliterator whose 660 // covered elements have an encounter order 661 // See use in stream take/dropWhile default default methods 662 final boolean noSplitting; 663 // True when operations are cancelled for all related spliterators 664 // For taking, spliterators cannot split or traversed 665 // For dropping, spliterators cannot be traversed 666 final AtomicBoolean cancel; 667 // True while taking or dropping should be performed when traversing 668 boolean takeOrDrop = true; 669 // The count of elements traversed 670 int count; 671 672 UnorderedWhileSpliterator(T_SPLITR s, boolean noSplitting) { 673 this.s = s; 674 this.noSplitting = noSplitting; 675 this.cancel = new AtomicBoolean(); 676 } 677 678 UnorderedWhileSpliterator(T_SPLITR s, UnorderedWhileSpliterator<T, T_SPLITR> parent) { 679 this.s = s; 680 this.noSplitting = parent.noSplitting; 681 this.cancel = parent.cancel; 682 } 683 684 @Override 685 public long estimateSize() { 686 return s.estimateSize(); 687 } 688 689 @Override 690 public int characteristics() { 691 // Size is not known 692 return s.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED); 693 } 694 695 @Override 696 public long getExactSizeIfKnown() { 697 return -1L; 698 } 699 700 @Override 701 public Comparator<? super T> getComparator() { 702 return s.getComparator(); 703 } 704 705 @Override 706 public T_SPLITR trySplit() { 707 @SuppressWarnings("unchecked") 708 T_SPLITR ls = noSplitting ? null : (T_SPLITR) s.trySplit(); 709 return ls != null ? makeSpliterator(ls) : null; 710 } 711 712 boolean checkCancelOnCount() { 713 return count != 0 || !cancel.get(); 714 } 715 716 abstract T_SPLITR makeSpliterator(T_SPLITR s); 717 718 abstract static class OfRef<T> extends UnorderedWhileSpliterator<T, Spliterator<T>> implements Consumer<T> { 719 final Predicate<? super T> p; 720 T t; 721 722 OfRef(Spliterator<T> s, boolean noSplitting, Predicate<? super T> p) { 723 super(s, noSplitting); 724 this.p = p; 725 } 726 727 OfRef(Spliterator<T> s, OfRef<T> parent) { 728 super(s, parent); 729 this.p = parent.p; 730 } 731 732 @Override 733 public void accept(T t) { 734 count = (count + 1) & CANCEL_CHECK_COUNT; 735 this.t = t; 736 } 737 738 static final class Taking<T> extends OfRef<T> { 739 Taking(Spliterator<T> s, boolean noSplitting, Predicate<? super T> p) { 740 super(s, noSplitting, p); 741 } 742 743 Taking(Spliterator<T> s, Taking<T> parent) { 744 super(s, parent); 745 } 746 747 @Override 748 public boolean tryAdvance(Consumer<? super T> action) { 749 boolean test = true; 750 if (takeOrDrop && // If can take 751 checkCancelOnCount() && // and if not cancelled 752 s.tryAdvance(this) && // and if advanced one element 753 (test = p.test(t))) { // and test on element passes 754 action.accept(t); // then accept element 755 return true; 756 } 757 else { 758 // Taking is finished 759 takeOrDrop = false; 760 // Cancel all further traversal and splitting operations 761 // only if test of element failed (short-circuited) 762 if (!test) 763 cancel.set(true); 764 return false; 765 } 766 } 767 768 @Override 769 public Spliterator<T> trySplit() { 770 // Do not split if all operations are cancelled 771 return cancel.get() ? null : super.trySplit(); 772 } 773 774 @Override 775 Spliterator<T> makeSpliterator(Spliterator<T> s) { 776 return new Taking<>(s, this); 777 } 778 } 779 780 static final class Dropping<T> extends OfRef<T> { 781 Dropping(Spliterator<T> s, boolean noSplitting, Predicate<? super T> p) { 782 super(s, noSplitting, p); 783 } 784 785 Dropping(Spliterator<T> s, Dropping<T> parent) { 786 super(s, parent); 787 } 788 789 @Override 790 public boolean tryAdvance(Consumer<? super T> action) { 791 if (takeOrDrop) { 792 takeOrDrop = false; 793 boolean adv; 794 boolean dropped = false; 795 while ((adv = s.tryAdvance(this)) && // If advanced one element 796 checkCancelOnCount() && // and if not cancelled 797 p.test(t)) { // and test on element passes 798 dropped = true; // then drop element 799 } 800 801 // Report advanced element, if any 802 if (adv) { 803 // Cancel all further dropping if one or more elements 804 // were previously dropped 805 if (dropped) 806 cancel.set(true); 807 action.accept(t); 808 } 809 return adv; 810 } 811 else { 812 return s.tryAdvance(action); 813 } 814 } 815 816 @Override 817 Spliterator<T> makeSpliterator(Spliterator<T> s) { 818 return new Dropping<>(s, this); 819 } 820 } 821 } 822 823 abstract static class OfInt extends UnorderedWhileSpliterator<Integer, Spliterator.OfInt> implements IntConsumer, Spliterator.OfInt { 824 final IntPredicate p; 825 int t; 826 827 OfInt(Spliterator.OfInt s, boolean noSplitting, IntPredicate p) { 828 super(s, noSplitting); 829 this.p = p; 830 } 831 832 OfInt(Spliterator.OfInt s, UnorderedWhileSpliterator.OfInt parent) { 833 super(s, parent); 834 this.p = parent.p; 835 } 836 837 @Override 838 public void accept(int t) { 839 count = (count + 1) & CANCEL_CHECK_COUNT; 840 this.t = t; 841 } 842 843 static final class Taking extends UnorderedWhileSpliterator.OfInt { 844 Taking(Spliterator.OfInt s, boolean noSplitting, IntPredicate p) { 845 super(s, noSplitting, p); 846 } 847 848 Taking(Spliterator.OfInt s, UnorderedWhileSpliterator.OfInt parent) { 849 super(s, parent); 850 } 851 852 @Override 853 public boolean tryAdvance(IntConsumer action) { 854 boolean test = true; 855 if (takeOrDrop && // If can take 856 checkCancelOnCount() && // and if not cancelled 857 s.tryAdvance(this) && // and if advanced one element 858 (test = p.test(t))) { // and test on element passes 859 action.accept(t); // then accept element 860 return true; 861 } 862 else { 863 // Taking is finished 864 takeOrDrop = false; 865 // Cancel all further traversal and splitting operations 866 // only if test of element failed (short-circuited) 867 if (!test) 868 cancel.set(true); 869 return false; 870 } 871 } 872 873 @Override 874 public Spliterator.OfInt trySplit() { 875 // Do not split if all operations are cancelled 876 return cancel.get() ? null : super.trySplit(); 877 } 878 879 @Override 880 Spliterator.OfInt makeSpliterator(Spliterator.OfInt s) { 881 return new Taking(s, this); 882 } 883 } 884 885 static final class Dropping extends UnorderedWhileSpliterator.OfInt { 886 Dropping(Spliterator.OfInt s, boolean noSplitting, IntPredicate p) { 887 super(s, noSplitting, p); 888 } 889 890 Dropping(Spliterator.OfInt s, UnorderedWhileSpliterator.OfInt parent) { 891 super(s, parent); 892 } 893 894 @Override 895 public boolean tryAdvance(IntConsumer action) { 896 if (takeOrDrop) { 897 takeOrDrop = false; 898 boolean adv; 899 boolean dropped = false; 900 while ((adv = s.tryAdvance(this)) && // If advanced one element 901 checkCancelOnCount() && // and if not cancelled 902 p.test(t)) { // and test on element passes 903 dropped = true; // then drop element 904 } 905 906 // Report advanced element, if any 907 if (adv) { 908 // Cancel all further dropping if one or more elements 909 // were previously dropped 910 if (dropped) 911 cancel.set(true); 912 action.accept(t); 913 } 914 return adv; 915 } 916 else { 917 return s.tryAdvance(action); 918 } 919 } 920 921 @Override 922 Spliterator.OfInt makeSpliterator(Spliterator.OfInt s) { 923 return new Dropping(s, this); 924 } 925 } 926 } 927 928 abstract static class OfLong extends UnorderedWhileSpliterator<Long, Spliterator.OfLong> implements LongConsumer, Spliterator.OfLong { 929 final LongPredicate p; 930 long t; 931 932 OfLong(Spliterator.OfLong s, boolean noSplitting, LongPredicate p) { 933 super(s, noSplitting); 934 this.p = p; 935 } 936 937 OfLong(Spliterator.OfLong s, UnorderedWhileSpliterator.OfLong parent) { 938 super(s, parent); 939 this.p = parent.p; 940 } 941 942 @Override 943 public void accept(long t) { 944 count = (count + 1) & CANCEL_CHECK_COUNT; 945 this.t = t; 946 } 947 948 static final class Taking extends UnorderedWhileSpliterator.OfLong { 949 Taking(Spliterator.OfLong s, boolean noSplitting, LongPredicate p) { 950 super(s, noSplitting, p); 951 } 952 953 Taking(Spliterator.OfLong s, UnorderedWhileSpliterator.OfLong parent) { 954 super(s, parent); 955 } 956 957 @Override 958 public boolean tryAdvance(LongConsumer action) { 959 boolean test = true; 960 if (takeOrDrop && // If can take 961 checkCancelOnCount() && // and if not cancelled 962 s.tryAdvance(this) && // and if advanced one element 963 (test = p.test(t))) { // and test on element passes 964 action.accept(t); // then accept element 965 return true; 966 } 967 else { 968 // Taking is finished 969 takeOrDrop = false; 970 // Cancel all further traversal and splitting operations 971 // only if test of element failed (short-circuited) 972 if (!test) 973 cancel.set(true); 974 return false; 975 } 976 } 977 978 @Override 979 public Spliterator.OfLong trySplit() { 980 // Do not split if all operations are cancelled 981 return cancel.get() ? null : super.trySplit(); 982 } 983 984 @Override 985 Spliterator.OfLong makeSpliterator(Spliterator.OfLong s) { 986 return new Taking(s, this); 987 } 988 } 989 990 static final class Dropping extends UnorderedWhileSpliterator.OfLong { 991 Dropping(Spliterator.OfLong s, boolean noSplitting, LongPredicate p) { 992 super(s, noSplitting, p); 993 } 994 995 Dropping(Spliterator.OfLong s, UnorderedWhileSpliterator.OfLong parent) { 996 super(s, parent); 997 } 998 999 @Override 1000 public boolean tryAdvance(LongConsumer action) { 1001 if (takeOrDrop) { 1002 takeOrDrop = false; 1003 boolean adv; 1004 boolean dropped = false; 1005 while ((adv = s.tryAdvance(this)) && // If advanced one element 1006 checkCancelOnCount() && // and if not cancelled 1007 p.test(t)) { // and test on element passes 1008 dropped = true; // then drop element 1009 } 1010 1011 // Report advanced element, if any 1012 if (adv) { 1013 // Cancel all further dropping if one or more elements 1014 // were previously dropped 1015 if (dropped) 1016 cancel.set(true); 1017 action.accept(t); 1018 } 1019 return adv; 1020 } 1021 else { 1022 return s.tryAdvance(action); 1023 } 1024 } 1025 1026 @Override 1027 Spliterator.OfLong makeSpliterator(Spliterator.OfLong s) { 1028 return new Dropping(s, this); 1029 } 1030 } 1031 } 1032 1033 abstract static class OfDouble extends UnorderedWhileSpliterator<Double, Spliterator.OfDouble> implements DoubleConsumer, Spliterator.OfDouble { 1034 final DoublePredicate p; 1035 double t; 1036 1037 OfDouble(Spliterator.OfDouble s, boolean noSplitting, DoublePredicate p) { 1038 super(s, noSplitting); 1039 this.p = p; 1040 } 1041 1042 OfDouble(Spliterator.OfDouble s, UnorderedWhileSpliterator.OfDouble parent) { 1043 super(s, parent); 1044 this.p = parent.p; 1045 } 1046 1047 @Override 1048 public void accept(double t) { 1049 count = (count + 1) & CANCEL_CHECK_COUNT; 1050 this.t = t; 1051 } 1052 1053 static final class Taking extends UnorderedWhileSpliterator.OfDouble { 1054 Taking(Spliterator.OfDouble s, boolean noSplitting, DoublePredicate p) { 1055 super(s, noSplitting, p); 1056 } 1057 1058 Taking(Spliterator.OfDouble s, UnorderedWhileSpliterator.OfDouble parent) { 1059 super(s, parent); 1060 } 1061 1062 @Override 1063 public boolean tryAdvance(DoubleConsumer action) { 1064 boolean test = true; 1065 if (takeOrDrop && // If can take 1066 checkCancelOnCount() && // and if not cancelled 1067 s.tryAdvance(this) && // and if advanced one element 1068 (test = p.test(t))) { // and test on element passes 1069 action.accept(t); // then accept element 1070 return true; 1071 } 1072 else { 1073 // Taking is finished 1074 takeOrDrop = false; 1075 // Cancel all further traversal and splitting operations 1076 // only if test of element failed (short-circuited) 1077 if (!test) 1078 cancel.set(true); 1079 return false; 1080 } 1081 } 1082 1083 @Override 1084 public Spliterator.OfDouble trySplit() { 1085 // Do not split if all operations are cancelled 1086 return cancel.get() ? null : super.trySplit(); 1087 } 1088 1089 @Override 1090 Spliterator.OfDouble makeSpliterator(Spliterator.OfDouble s) { 1091 return new Taking(s, this); 1092 } 1093 } 1094 1095 static final class Dropping extends UnorderedWhileSpliterator.OfDouble { 1096 Dropping(Spliterator.OfDouble s, boolean noSplitting, DoublePredicate p) { 1097 super(s, noSplitting, p); 1098 } 1099 1100 Dropping(Spliterator.OfDouble s, UnorderedWhileSpliterator.OfDouble parent) { 1101 super(s, parent); 1102 } 1103 1104 @Override 1105 public boolean tryAdvance(DoubleConsumer action) { 1106 if (takeOrDrop) { 1107 takeOrDrop = false; 1108 boolean adv; 1109 boolean dropped = false; 1110 while ((adv = s.tryAdvance(this)) && // If advanced one element 1111 checkCancelOnCount() && // and if not cancelled 1112 p.test(t)) { // and test on element passes 1113 dropped = true; // then drop element 1114 } 1115 1116 // Report advanced element, if any 1117 if (adv) { 1118 // Cancel all further dropping if one or more elements 1119 // were previously dropped 1120 if (dropped) 1121 cancel.set(true); 1122 action.accept(t); 1123 } 1124 return adv; 1125 } 1126 else { 1127 return s.tryAdvance(action); 1128 } 1129 } 1130 1131 @Override 1132 Spliterator.OfDouble makeSpliterator(Spliterator.OfDouble s) { 1133 return new Dropping(s, this); 1134 } 1135 } 1136 } 1137 } 1138 1139 1140 // 1141 1142 /** 1143 * {@code ForkJoinTask} implementing takeWhile computation. 1144 * <p> 1145 * If the pipeline has encounter order then all tasks to the right of 1146 * a task where traversal was short-circuited are cancelled. 1147 * The results of completed (and cancelled) tasks are discarded. 1148 * The result of merging a short-circuited left task and right task (which 1149 * may or may not be short-circuited) is that left task. 1150 * <p> 1151 * If the pipeline has no encounter order then all tasks to the right of 1152 * a task where traversal was short-circuited are cancelled. 1153 * The results of completed (and possibly cancelled) tasks are not 1154 * discarded, as there is no need to throw away computed results. 1155 * The result of merging does not change if a left task was 1156 * short-circuited. 1157 * No attempt is made, once a leaf task stopped taking, for it to cancel 1158 * all other tasks, and further more, short-circuit the computation with its 1159 * result. 1160 * 1161 * @param Input element type to the stream pipeline 1162 * @param Output element type from the stream pipeline 1163 */ 1164 @SuppressWarnings("serial") 1165 private static final class TakeWhileTask<P_IN, P_OUT> 1166 extends AbstractShortCircuitTask<P_IN, P_OUT, Node<P_OUT>, TakeWhileTask<P_IN, P_OUT>> { 1167 private final AbstractPipeline<P_OUT, P_OUT, ?> op; 1168 private final IntFunction<P_OUT[]> generator; 1169 private final boolean isOrdered; 1170 private long thisNodeSize; 1171 // True if a short-circuited 1172 private boolean shortCircuited; 1173 // True if completed, must be set after the local result 1174 private volatile boolean completed; 1175 1176 TakeWhileTask(AbstractPipeline<P_OUT, P_OUT, ?> op, 1177 PipelineHelper<P_OUT> helper, 1178 Spliterator<P_IN> spliterator, 1179 IntFunction<P_OUT[]> generator) { 1180 super(helper, spliterator); 1181 this.op = op; 1182 this.generator = generator; 1183 this.isOrdered = StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags()); 1184 } 1185 1186 TakeWhileTask(TakeWhileTask<P_IN, P_OUT> parent, Spliterator<P_IN> spliterator) { 1187 super(parent, spliterator); 1188 this.op = parent.op; 1189 this.generator = parent.generator; 1190 this.isOrdered = parent.isOrdered; 1191 } 1192 1193 @Override 1194 protected TakeWhileTask<P_IN, P_OUT> makeChild(Spliterator<P_IN> spliterator) { 1195 return new TakeWhileTask<>(this, spliterator); 1196 } 1197 1198 @Override 1199 protected final Node<P_OUT> getEmptyResult() { 1200 return Nodes.emptyNode(op.getOutputShape()); 1201 } 1202 1203 @Override 1204 protected final Node<P_OUT> doLeaf() { 1205 Node.Builder<P_OUT> builder = helper.makeNodeBuilder(-1, generator); 1206 Sink<P_OUT> s = op.opWrapSink(helper.getStreamAndOpFlags(), builder); 1207 1208 if (shortCircuited = helper.copyIntoWithCancel(helper.wrapSink(s), spliterator)) { 1209 // Cancel later nodes if the predicate returned false 1210 // during traversal 1211 cancelLaterNodes(); 1212 } 1213 1214 Node<P_OUT> node = builder.build(); 1215 thisNodeSize = node.count(); 1216 return node; 1217 } 1218 1219 @Override 1220 public final void onCompletion(CountedCompleter<?> caller) { 1221 if (!isLeaf()) { 1222 Node<P_OUT> result; 1223 shortCircuited = leftChild.shortCircuited | rightChild.shortCircuited; 1224 if (isOrdered && canceled) { 1225 thisNodeSize = 0; 1226 result = getEmptyResult(); 1227 } 1228 else if (isOrdered && leftChild.shortCircuited) { 1229 // If taking finished on the left node then 1230 // use the left node result 1231 thisNodeSize = leftChild.thisNodeSize; 1232 result = leftChild.getLocalResult(); 1233 } 1234 else { 1235 thisNodeSize = leftChild.thisNodeSize + rightChild.thisNodeSize; 1236 result = merge(); 1237 } 1238 1239 setLocalResult(result); 1240 } 1241 1242 completed = true; 1243 super.onCompletion(caller); 1244 } 1245 1246 Node<P_OUT> merge() { 1247 if (leftChild.thisNodeSize == 0) { 1248 // If the left node size is 0 then 1249 // use the right node result 1250 return rightChild.getLocalResult(); 1251 } 1252 else if (rightChild.thisNodeSize == 0) { 1253 // If the right node size is 0 then 1254 // use the left node result 1255 return leftChild.getLocalResult(); 1256 } 1257 else { 1258 // Combine the left and right nodes 1259 return Nodes.conc(op.getOutputShape(), 1260 leftChild.getLocalResult(), rightChild.getLocalResult()); 1261 } 1262 } 1263 1264 @Override 1265 protected void cancel() { 1266 super.cancel(); 1267 if (isOrdered && completed) 1268 // If the task is completed then clear the result, if any 1269 // to aid GC 1270 setLocalResult(getEmptyResult()); 1271 } 1272 } 1273 1274 /** 1275 * {@code ForkJoinTask} implementing dropWhile computation. 1276 * <p> 1277 * If the pipeline has encounter order then each leaf task will not 1278 * drop elements but will obtain a count of the elements that would have 1279 * been otherwise dropped. That count is used as an index to track 1280 * elements to be dropped. Merging will update the index so it corresponds 1281 * to the index that is the end of the global prefix of elements to be 1282 * dropped. The root is truncated according to that index. 1283 * <p> 1284 * If the pipeline has no encounter order then each leaf task will drop 1285 * elements. Leaf tasks are ordinarily merged. No truncation of the root 1286 * node is required. 1287 * No attempt is made, once a leaf task stopped dropping, for it to cancel 1288 * all other tasks, and further more, short-circuit the computation with 1289 * its result. 1290 * 1291 * @param Input element type to the stream pipeline 1292 * @param Output element type from the stream pipeline 1293 */ 1294 @SuppressWarnings("serial") 1295 private static final class DropWhileTask<P_IN, P_OUT> 1296 extends AbstractTask<P_IN, P_OUT, Node<P_OUT>, DropWhileTask<P_IN, P_OUT>> { 1297 private final AbstractPipeline<P_OUT, P_OUT, ?> op; 1298 private final IntFunction<P_OUT[]> generator; 1299 private final boolean isOrdered; 1300 private long thisNodeSize; 1301 // The index from which elements of the node should be taken 1302 // i.e. the node should be truncated from [takeIndex, thisNodeSize) 1303 // Equivalent to the count of dropped elements 1304 private long index; 1305 1306 DropWhileTask(AbstractPipeline<P_OUT, P_OUT, ?> op, 1307 PipelineHelper<P_OUT> helper, 1308 Spliterator<P_IN> spliterator, 1309 IntFunction<P_OUT[]> generator) { 1310 super(helper, spliterator); 1311 assert op instanceof DropWhileOp; 1312 this.op = op; 1313 this.generator = generator; 1314 this.isOrdered = StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags()); 1315 } 1316 1317 DropWhileTask(DropWhileTask<P_IN, P_OUT> parent, Spliterator<P_IN> spliterator) { 1318 super(parent, spliterator); 1319 this.op = parent.op; 1320 this.generator = parent.generator; 1321 this.isOrdered = parent.isOrdered; 1322 } 1323 1324 @Override 1325 protected DropWhileTask<P_IN, P_OUT> makeChild(Spliterator<P_IN> spliterator) { 1326 return new DropWhileTask<>(this, spliterator); 1327 } 1328 1329 @Override 1330 protected final Node<P_OUT> doLeaf() { 1331 boolean isChild = !isRoot(); 1332 // If this not the root and pipeline is ordered and size is known 1333 // then pre-size the builder 1334 long sizeIfKnown = isChild && isOrdered && StreamOpFlag.SIZED.isPreserved(op.sourceOrOpFlags) 1335 ? op.exactOutputSizeIfKnown(spliterator) 1336 : -1; 1337 Node.Builder<P_OUT> builder = helper.makeNodeBuilder(sizeIfKnown, generator); 1338 @SuppressWarnings("unchecked") 1339 DropWhileOp<P_OUT> dropOp = (DropWhileOp<P_OUT>) op; 1340 // If this leaf is the root then there is no merging on completion 1341 // and there is no need to retain dropped elements 1342 DropWhileSink<P_OUT> s = dropOp.opWrapSink(builder, isOrdered && isChild); 1343 helper.wrapAndCopyInto(s, spliterator); 1344 1345 Node<P_OUT> node = builder.build(); 1346 thisNodeSize = node.count(); 1347 index = s.getDropCount(); 1348 return node; 1349 } 1350 1351 @Override 1352 public final void onCompletion(CountedCompleter<?> caller) { 1353 if (!isLeaf()) { 1354 if (isOrdered) { 1355 index = leftChild.index; 1356 // If a contiguous sequence of dropped elements 1357 // include those of the right node, if any 1358 if (index == leftChild.thisNodeSize) 1359 index += rightChild.index; 1360 } 1361 1362 thisNodeSize = leftChild.thisNodeSize + rightChild.thisNodeSize; 1363 Node<P_OUT> result = merge(); 1364 setLocalResult(isRoot() ? doTruncate(result) : result); 1365 } 1366 1367 super.onCompletion(caller); 1368 } 1369 1370 private Node<P_OUT> merge() { 1371 if (leftChild.thisNodeSize == 0) { 1372 // If the left node size is 0 then 1373 // use the right node result 1374 return rightChild.getLocalResult(); 1375 } 1376 else if (rightChild.thisNodeSize == 0) { 1377 // If the right node size is 0 then 1378 // use the left node result 1379 return leftChild.getLocalResult(); 1380 } 1381 else { 1382 // Combine the left and right nodes 1383 return Nodes.conc(op.getOutputShape(), 1384 leftChild.getLocalResult(), rightChild.getLocalResult()); 1385 } 1386 } 1387 1388 private Node<P_OUT> doTruncate(Node<P_OUT> input) { 1389 return isOrdered 1390 ? input.truncate(index, input.count(), generator) 1391 : input; 1392 } 1393 } 1394} 1395