MessageIterator.java (3028B) download
1package protobuf;
2
3import java.io.IOException;
4import java.io.InputStream;
5import java.util.ArrayList;
6import java.util.Iterator;
7import java.util.List;
8
9import protobuf.exception.InputException;
10import protobuf.exception.OverflowException;
11
12/**
13 * Iterator for parsing Protocol Buffers from an input stream.
14 *
15 * <p>
16 * The iterator reads and parses wire streams from the provided input stream and
17 * produces {@link ProtobufReader} instances, representing the parsed
18 * messages.
19 *
20 * @see ProtobufReader
21 */
22public class MessageIterator implements Iterator<ProtobufReader> {
23 /**
24 * The input stream from which the Protocol Buffers are read.
25 */
26 protected final InputStream input;
27
28 /**
29 * The remaining length of the input stream. It is decremented as messages are
30 * read.
31 */
32 protected int length;
33
34 /**
35 * A list of delayed operations to be executed when there are no more messages
36 * to read.
37 */
38 protected List<Runnable> delayed = new ArrayList<>();
39
40 /**
41 * Constructs a new MessageIterator with the given input stream and length.
42 *
43 * @param input The input stream containing the Protocol Buffers.
44 * @param length The initial length of the input stream.
45 */
46 public MessageIterator(InputStream input, int length) {
47 this.input = input;
48 this.length = length;
49 }
50
51 /**
52 * Reads a variable-length integer (varint) from the input stream.
53 *
54 * @return The parsed varint value.
55 * @throws OverflowException If the input exceeds the expected size.
56 * @throws InputException If an I/O error occurs during reading.
57 */
58 private int varint() {
59 int result = 0;
60 int b = 0;
61 int shift = 0;
62 while (shift < 32 && length > 0) {
63 try {
64 b = input.read();
65 } catch (IOException exc) {
66 throw new InputException(exc);
67 }
68 if (b == -1)
69 break;
70 length--;
71
72 result |= (b & 0x7f) << shift;
73 shift += 7;
74 if ((b & 0x80) == 0)
75 return result;
76 }
77 throw new OverflowException("input exceed");
78 }
79
80 /**
81 * Checks if there are more Protocol Buffers messages to be read.
82 *
83 * @return {@code true} if there are more messages; {@code false} otherwise.
84 */
85 @Override
86 public boolean hasNext() {
87 if (length == 0) {
88 delayed.forEach(Runnable::run);
89 delayed.clear();
90 }
91 return length > 0;
92 }
93
94 /**
95 * Reads the next wire stream and creates a {@link ProtobufReader}
96 * instance.
97 *
98 * @return The parsed {@link ProtobufReader} instance.
99 */
100 @Override
101 public ProtobufReader next() {
102 int tag = varint();
103
104 // least-significant 3 bits are the type, remaining is the tag value
105 // type = tag & 0b00000111
106 return new ProtobufReader(this, WireType.values()[tag & 0x07], tag >> 3);
107 }
108}