BlobSpliterator.java (5511B) download
1package osm.protobuf;
2
3import java.io.IOException;
4import java.io.InputStream;
5import java.io.RandomAccessFile;
6import java.util.ArrayList;
7import java.util.List;
8import java.util.Spliterator;
9import java.util.concurrent.locks.Lock;
10import java.util.concurrent.locks.ReentrantLock;
11import java.util.function.Consumer;
12import java.util.stream.Stream;
13import java.util.stream.StreamSupport;
14
15import osm.common.RandomAccessFileInputStream;
16import osm.message.Blob;
17import osm.message.BlobHeader;
18import osm.message.Entity;
19import osm.message.HeaderBlock;
20
21/**
22 * A {@link Spliterator} implementation for iterating over lists of
23 * {@link Entity} objects parsed from OpenStreetMap blobs.
24 */
25public class BlobSpliterator implements Spliterator<List<Entity>> {
26 /**
27 * BlobHeader is never bigger than 64K.
28 */
29 private static final int MAX_HEADER_SIZE = 64 * 1024;
30
31 /**
32 * Blob is never bigger than 32M.
33 */
34 private static final int MAX_BLOB_SIZE = 32 * 1024 * 1024;
35
36 private final RandomAccessFile input;
37 private final InputStream stream;
38 private final Lock lock;
39 private final Consumer<HeaderBlock> onHeader;
40 private BlobHeader[] headers;
41
42 private final int start;
43 private int end = 0;
44 private int current = 0;
45
46 /**
47 * Constructs a BlobSpliterator with the specified parameters.
48 *
49 * @param input The {@link RandomAccessFile} used for reading blobs.
50 * @param onHeader A {@link Consumer} to handle the parsed {@link HeaderBlock}
51 * objects.
52 */
53 public BlobSpliterator(RandomAccessFile input, Consumer<HeaderBlock> onHeader) {
54 this.input = input;
55 this.onHeader = onHeader;
56 this.stream = new RandomAccessFileInputStream(input);
57 this.lock = new ReentrantLock();
58
59 List<BlobHeader> headerList = new ArrayList<>();
60 try {
61 while (input.getFilePointer() < input.length()) {
62 int headerLength = input.readInt();
63
64 if (headerLength > MAX_HEADER_SIZE)
65 throw new RuntimeException(
66 "blob header exceeds " + MAX_HEADER_SIZE + " bytes (" + headerLength + ")");
67
68 BlobHeader header = new BlobHeader().parse(stream, headerLength);
69
70 if (header.size > MAX_BLOB_SIZE)
71 throw new RuntimeException("blob exceeds " + MAX_BLOB_SIZE + " bytes (" + header.size + ")");
72
73 header.offset = input.getFilePointer();
74 headerList.add(header);
75
76 input.skipBytes(header.size);
77 }
78 } catch (IOException e) {
79 throw new RuntimeException(e);
80 }
81
82 headers = new BlobHeader[headerList.size()];
83 headerList.toArray(headers);
84
85 start = 0;
86 current = 0;
87 end = headers.length;
88 }
89
90 private BlobSpliterator(RandomAccessFile input, Lock lock, Consumer<HeaderBlock> onHeader, BlobHeader[] headers,
91 int start, int end) {
92 this.input = input;
93 this.lock = lock;
94 this.onHeader = onHeader;
95 this.stream = new RandomAccessFileInputStream(input);
96 this.headers = headers;
97 this.start = start;
98 this.current = start;
99 this.end = end;
100 }
101
102 @Override
103 public Spliterator<List<Entity>> trySplit() {
104 // locking because of end
105 lock.lock();
106 if (current < end - 1)
107 return null;
108
109 int mid = (end - start) / 2;
110 int otherEnd = end;
111
112 end = start + mid;
113 lock.unlock();
114
115 return new BlobSpliterator(input, lock, onHeader, headers, end, otherEnd);
116 }
117
118 @Override
119 public boolean tryAdvance(Consumer<? super List<Entity>> action) {
120 // locking because of input, end
121 lock.lock();
122
123 if (current >= end)
124 return false;
125
126 BlobHeader header = headers[current++];
127
128 try {
129 input.seek(header.offset);
130 } catch (IOException e) {
131 throw new RuntimeException(e);
132 }
133
134 Blob blob = new Blob(header.headerType).parse(stream, header.size);
135
136 // unlocking, every critical variable is unused
137 lock.unlock();
138
139 if (blob.header != null)
140 onHeader.accept(blob.header);
141
142 if (blob.primitive != null)
143 action.accept(blob.primitive);
144
145 return true;
146 }
147
148 @Override
149 public long estimateSize() {
150 return end - current;
151 }
152
153 @Override
154 public int characteristics() {
155 return DISTINCT | SUBSIZED | SIZED | NONNULL | IMMUTABLE | CONCURRENT;
156 }
157
158 /**
159 * Creates a sequential or parallel {@link Stream} from this
160 * {@link BlobSpliterator}.
161 *
162 * @param parallel A boolean indicating whether the stream should be parallel.
163 * @return A {@link Stream} of lists of {@link Entity} objects.
164 */
165 public Stream<List<Entity>> stream(boolean parallel) {
166 return StreamSupport.stream(this, parallel);
167 }
168
169 /**
170 * Creates a sequential stream from this {@link BlobSpliterator}.
171 *
172 * @return A sequential {@link Stream} of lists of {@link Entity} objects.
173 */
174 public Stream<List<Entity>> stream() {
175 return stream(false);
176 }
177
178 /**
179 * Creates a parallel stream from this {@link BlobSpliterator}.
180 *
181 * @return A parallel {@link Stream} of lists of {@link Entity} objects.
182 */
183 public Stream<List<Entity>> parallelStream() {
184 return stream(true);
185 }
186}