misc/persolijn

osm-protobuf/src/main/java/osm/protobuf/BlobSpliterator.java in master
Repositories | Summary | Log | Files

BlobSpliterator.java (5511B) download


  1package osm.protobuf;
  2
  3import java.io.IOException;
  4import java.io.InputStream;
  5import java.io.RandomAccessFile;
  6import java.util.ArrayList;
  7import java.util.List;
  8import java.util.Spliterator;
  9import java.util.concurrent.locks.Lock;
 10import java.util.concurrent.locks.ReentrantLock;
 11import java.util.function.Consumer;
 12import java.util.stream.Stream;
 13import java.util.stream.StreamSupport;
 14
 15import osm.common.RandomAccessFileInputStream;
 16import osm.message.Blob;
 17import osm.message.BlobHeader;
 18import osm.message.Entity;
 19import osm.message.HeaderBlock;
 20
 21/**
 22 * A {@link Spliterator} implementation for iterating over lists of
 23 * {@link Entity} objects parsed from OpenStreetMap blobs.
 24 */
 25public class BlobSpliterator implements Spliterator<List<Entity>> {
 26    /**
 27     * BlobHeader is never bigger than 64K.
 28     */
 29    private static final int MAX_HEADER_SIZE = 64 * 1024;
 30
 31    /**
 32     * Blob is never bigger than 32M.
 33     */
 34    private static final int MAX_BLOB_SIZE = 32 * 1024 * 1024;
 35
 36    private final RandomAccessFile input;
 37    private final InputStream stream;
 38    private final Lock lock;
 39    private final Consumer<HeaderBlock> onHeader;
 40    private BlobHeader[] headers;
 41
 42    private final int start;
 43    private int end = 0;
 44    private int current = 0;
 45
 46    /**
 47     * Constructs a BlobSpliterator with the specified parameters.
 48     *
 49     * @param input    The {@link RandomAccessFile} used for reading blobs.
 50     * @param onHeader A {@link Consumer} to handle the parsed {@link HeaderBlock}
 51     *                 objects.
 52     */
 53    public BlobSpliterator(RandomAccessFile input, Consumer<HeaderBlock> onHeader) {
 54        this.input = input;
 55        this.onHeader = onHeader;
 56        this.stream = new RandomAccessFileInputStream(input);
 57        this.lock = new ReentrantLock();
 58
 59        List<BlobHeader> headerList = new ArrayList<>();
 60        try {
 61            while (input.getFilePointer() < input.length()) {
 62                int headerLength = input.readInt();
 63
 64                if (headerLength > MAX_HEADER_SIZE)
 65                    throw new RuntimeException(
 66                            "blob header exceeds " + MAX_HEADER_SIZE + " bytes (" + headerLength + ")");
 67
 68                BlobHeader header = new BlobHeader().parse(stream, headerLength);
 69
 70                if (header.size > MAX_BLOB_SIZE)
 71                    throw new RuntimeException("blob exceeds " + MAX_BLOB_SIZE + " bytes (" + header.size + ")");
 72
 73                header.offset = input.getFilePointer();
 74                headerList.add(header);
 75
 76                input.skipBytes(header.size);
 77            }
 78        } catch (IOException e) {
 79            throw new RuntimeException(e);
 80        }
 81
 82        headers = new BlobHeader[headerList.size()];
 83        headerList.toArray(headers);
 84
 85        start = 0;
 86        current = 0;
 87        end = headers.length;
 88    }
 89
 90    private BlobSpliterator(RandomAccessFile input, Lock lock, Consumer<HeaderBlock> onHeader, BlobHeader[] headers,
 91            int start, int end) {
 92        this.input = input;
 93        this.lock = lock;
 94        this.onHeader = onHeader;
 95        this.stream = new RandomAccessFileInputStream(input);
 96        this.headers = headers;
 97        this.start = start;
 98        this.current = start;
 99        this.end = end;
100    }
101
102    @Override
103    public Spliterator<List<Entity>> trySplit() {
104        // locking because of end
105        lock.lock();
106        if (current < end - 1)
107            return null;
108
109        int mid = (end - start) / 2;
110        int otherEnd = end;
111
112        end = start + mid;
113        lock.unlock();
114
115        return new BlobSpliterator(input, lock, onHeader, headers, end, otherEnd);
116    }
117
118    @Override
119    public boolean tryAdvance(Consumer<? super List<Entity>> action) {
120        // locking because of input, end
121        lock.lock();
122
123        if (current >= end)
124            return false;
125
126        BlobHeader header = headers[current++];
127
128        try {
129            input.seek(header.offset);
130        } catch (IOException e) {
131            throw new RuntimeException(e);
132        }
133
134        Blob blob = new Blob(header.headerType).parse(stream, header.size);
135
136        // unlocking, every critical variable is unused
137        lock.unlock();
138
139        if (blob.header != null)
140            onHeader.accept(blob.header);
141
142        if (blob.primitive != null)
143            action.accept(blob.primitive);
144
145        return true;
146    }
147
148    @Override
149    public long estimateSize() {
150        return end - current;
151    }
152
153    @Override
154    public int characteristics() {
155        return DISTINCT | SUBSIZED | SIZED | NONNULL | IMMUTABLE | CONCURRENT;
156    }
157
158    /**
159     * Creates a sequential or parallel {@link Stream} from this
160     * {@link BlobSpliterator}.
161     *
162     * @param parallel A boolean indicating whether the stream should be parallel.
163     * @return A {@link Stream} of lists of {@link Entity} objects.
164     */
165    public Stream<List<Entity>> stream(boolean parallel) {
166        return StreamSupport.stream(this, parallel);
167    }
168
169    /**
170     * Creates a sequential stream from this {@link BlobSpliterator}.
171     *
172     * @return A sequential {@link Stream} of lists of {@link Entity} objects.
173     */
174    public Stream<List<Entity>> stream() {
175        return stream(false);
176    }
177
178    /**
179     * Creates a parallel stream from this {@link BlobSpliterator}.
180     *
181     * @return A parallel {@link Stream} of lists of {@link Entity} objects.
182     */
183    public Stream<List<Entity>> parallelStream() {
184        return stream(true);
185    }
186}