The Grit in Processing Unicode Strings with NDJSON

Unicode is pretty amazing, you can encode strings in single or multibyte characters. Perhaps a smile… 😀 which is U+1F600. It’s pretty cool, so cool you should read If UTF-8 is an 8-bit encoding, why does it need 1-4 bytes? which has four key sequences for UTF8:

   Char. number range  |        UTF-8 octet sequence
      (hexadecimal)    |              (binary)
   --------------------+---------------------------------------------
   0000 0000-0000 007F | 0xxxxxxx
   0000 0080-0000 07FF | 110xxxxx 10xxxxxx
   0000 0800-0000 FFFF | 1110xxxx 10xxxxxx 10xxxxxx
   0001 0000-0010 FFFF | 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx

Until recently, I’ve been working with NDJSON files as part of the HL7 FHIR: Bulk Data Access IG to export healthcare data and the proposed Import specification to import healthcare data. These files store one JSON per line and delimite with a \n, such as:

{"resourceType":"Patient"}
{"resourceType":"Patient"}
{"resourceType":"Patient"}

The following Java snippet generates a substation set of lines that can be injected into a stream for testing with unicode (and are Newline Delimited).

StringBuilder line = new StringBuilder();
for (int codePoint = 32; codePoint <= 0x1F64F; codePoint++) {
    line.append(Character.toChars(codePoint));
    if (codePoint % 64 == 0) {
        line.append("\n");
    }
}
System.out.println(line.toString());

As this data is processed aynchronously in on OpenLiberty: JavaBatch as a set of jobs. These jobs process data through a Read(Source)-Checkpoint-Write(Sink) pattern. The pattern ensures enough data is read from the source before a write action on the sink.

I found that processing the variable data with an unknown unicode set needed a counting stream to keep track of the bytes. The CountingStream acted as a delegate to accumulate bytes, length of the processed values and find the end of a line or end of the file.

public static class CountingStream extends InputStream {
        private static int LF = '\n';
        private static final long MAX_LENGTH_PER_LINE = 2147483648l;

        // 256kb block
        private ByteArrayOutputStream out = new ByteArrayOutputStream(256000);
        private boolean eol = false;
        private long length = 0;

        private InputStream delegate;

        /**
         * ctor
         * @param in
         */
        public CountingStream(InputStream in) {
            this.delegate = in;
        }

        /**
         * reset the line
         */
        public void resetLine() {
            out.reset();
            eol = false;
        }

        /**
         * @return the length of the resources returned in the reader
         */
        public long getLength() {
            return length;
        }

        /**
         * Gets the String representing the line of bytes.
         * 
         * @return
         * @throws UnsupportedEncodingException
         */
        public String getLine() throws UnsupportedEncodingException {
            String str = new String(out.toByteArray(), "UTF-8");
            if (str.isEmpty()) {
                str = null;
            }
            return str;
        }

        public boolean eol() {
            return eol;
        }

        /**
         * Returns the line that is aggregated up until a new line character
         * @return
         * @throws IOException
         */
        public String readLine() throws IOException {
            int r = read();
            while (r != -1) {
                if (eol()) {
                    eol = false;
                    return getLine();
                }
                r = read();
            }
            if (r == -1 && length > 0) {
                return getLine();
            }
            return getLine();
        }

        @Override
        public int read() throws IOException {
            int r = delegate.read();
            if (r == -1) {
                return -1;
            }
            byte b = (byte) r;
            if (LF == (int) b) {
                length++;
                eol = true;
            } else {
                length++;
                if (length == MAX_LENGTH_PER_LINE) {
                    throw new IOException("Current Line in NDJSON exceeds limit " + MAX_LENGTH_PER_LINE);
                }
                out.write(b);
            }
            return b;
        }
    }

I found one important thing in the delegate, with thanks from a colleague and CERT – you must accumulate the bytes and have a maximum size per line. The CERT article is at STR50-J. Use the appropriate method for counting characters in a string

The grit here is:

  1. Accumulate: Don’t process a character int read() at a time, accumulate your bytes and defer to the String creation in Java to ensure it’s processed in your project’s encoding.
  2. Set a limit: Don’t infinitely process the data, stop when it violates a set contract.

If you are doing more complicated processing, say you are streaming from Azure Blob, Amazon S3 or HTTPS and need to process the stream as chunks. You’ll want to do something a bit more complicated.

The grit here is:

  1. Read Blocks and not the whole stream: Read a block of bytes at a time instead of ‘draining’ the bytes when a sufficient block is retrieved.
  2. Assemble Lines in multiple Block reads.

The code looks like this:

    public static class CountingStream extends InputStream {
        private static int LF = '\n';
        private static final long MAX_LENGTH_PER_LINE = 2147483648l;

        // 256kb block
        private ByteArrayOutputStream out;
        private long length = 0;

        private InputStream delegate;

        /**
         * 
         * @param out ByteArrayOutputStream caches the data cross reads
         * @param in InputStream is generally the S3InputStream
         */
        public CountingStream(ByteArrayOutputStream out, InputStream in) {
            this.out = out;
            this.delegate = in;
        }

        /**
         * Gets the String representing the line of bytes.
         * 
         * @return
         * @throws UnsupportedEncodingException
         */
        public String getLine() throws UnsupportedEncodingException {
            String str = new String(out.toByteArray(), "UTF-8");
            if (str.isEmpty()) {
                str = null;
            }
            return str;
        }

        @Override
        public int read() throws IOException {
            return delegate.read();
        }

        /**
         * drains the stream so we don't leave a hanging connection
         * @throws IOException
         */
        public void drain() throws IOException {
            int l = delegate.read();
            while (l != -1) {
                l = delegate.read();
            }
        }

        /**
         * 
         * @param counter
         * @return
         * @throws IOException
         */
        public String readLine() throws IOException {
            int r = read();
            if (r == -1) {
                return null;
            } else {
                String result = null;
                while (r != -1) {
                    byte b = (byte) r;
                    if (LF == (int) b) {
                        length++;
                        r = -1;
                        result = getLine();
                        out.reset();
                    } else {
                        length++;
                        if (length == MAX_LENGTH_PER_LINE) {
                            throw new IOException("Current Line in NDJSON exceeds limit " + MAX_LENGTH_PER_LINE);
                        }
                        out.write(b);
                        r = read();
                    }
                }
                return result;
            }
        }
    }

Importantly, the code defers the caching to the EXTERNAL caller, and in this case assembles a window of resources:

    protected void readFromObjectStoreWithLowMaxRange(AmazonS3 c, String b, String workItem) throws FHIRException {

        // Don't add tempResources to resources until we're done (we do retry), it's a temporary cache of the Resources
        List<Resource> tempResources = new ArrayList<>();

        // number of bytes read.
        long numberOfBytesRead = 0l;
        int totalReads = 0;
        int mux = 0;

        // The cached FHIRParserException
        FHIRParserException fpeDownstream = null;

        // Closed when the Scope is out. The size is double the read window.
        // The backing array is allocated at creation.
        ByteArrayOutputStream cacheOut = new ByteArrayOutputStream(512000);
        boolean complete = false;
        while (!complete) {
            // Condition: At the end of the file... and it should never be more than the file Size
            // however, in rare circumstances the person may have 'grown' or added to the file
            // while operating on the $import and we want to defensively end rather than an exact match
            // Early exit from the loop...
            long start = this.transientUserData.getCurrentBytes();
            if (this.transientUserData.getImportFileSize() <= start) {
                complete = true; // NOP
                break;
            }

            // Condition: Window would exceed the maximum File Size
            // Prune the end to -1 off the maximum.
            // The following is 256K window. 256K is used so we only drain a portion of the inputstream.
            // and not the whole file's input stream.
            long end = start + 256000;
            if (end >= this.transientUserData.getImportFileSize()) {
                end = this.transientUserData.getImportFileSize() - 1;
                complete = true; // We still need to process the bytes.
            }

            // Request the start and end of the S3ObjectInputStream that's going to be retrieved
            GetObjectRequest req = new GetObjectRequest(b, workItem)
                                            .withRange(start, end);

            if (LOG.isLoggable(Level.FINE)) {
                // Useful when debugging edge of the stream problems
                LOG.fine("S3ObjectInputStream --- " + start + " " + end);
            }

            boolean parsedWithIssue = false;
            try (S3Object obj = c.getObject(req);
                    S3ObjectInputStream in = obj.getObjectContent();
                    BufferedInputStream buffer = new BufferedInputStream(in);
                    CountingStream reader = new CountingStream(cacheOut, in)) {

                // The interior block allows a drain operation to be executed finally.
                // as a best practice we want to drain the remainder of the input
                // this drain should be at worst 255K (-1 for new line character)
                try {
                    String resourceStr = reader.readLine();
                    // The first line is a large resource
                    if (resourceStr == null) {
                        this.transientUserData.setCurrentBytes(this.transientUserData.getCurrentBytes() + reader.length);
                        reader.length = 0;
                        mux++;
                    }

                    while (resourceStr != null && totalReads < maxRead) {
                        try (StringReader stringReader = new StringReader(resourceStr)) {
                            tempResources.add(FHIRParser.parser(Format.JSON).parse(stringReader));
                        } catch (FHIRParserException fpe) {
                            // Log and skip the invalid FHIR resource.
                            parseFailures++;
                            parsedWithIssue = true;
                            fpeDownstream = fpe;
                        }

                        long priorLineLength = reader.length;
                        reader.length = 0;
                        resourceStr = reader.readLine();

                        if (!parsedWithIssue) {
                            this.transientUserData.setCurrentBytes(this.transientUserData.getCurrentBytes() + priorLineLength);
                            numberOfBytesRead += reader.length;
                            totalReads++;
                        } else if ((parsedWithIssue && resourceStr != null)
                                || (parsedWithIssue && 
                                        (this.transientUserData.getImportFileSize() <= this.transientUserData.getCurrentBytes() + priorLineLength))) { 
                            // This is potentially end of bad line
                            // -or-
                            // This is the last line failing to parse
                            long line = this.transientUserData.getNumOfProcessedResources() + totalReads;
                            LOG.log(Level.SEVERE, "readResources: Failed to parse line " + totalReads + " of [" + workItem + "].", fpeDownstream);
                            String msg = "readResources: " + "Failed to parse line " + line + " of [" + workItem + "].";

                            ConfigurationAdapter adapter = ConfigurationFactory.getInstance();
                            String out = adapter.getOperationOutcomeProvider(source);
                            boolean collectImportOperationOutcomes = adapter.shouldStorageProviderCollectOperationOutcomes(source)
                                    && !StorageType.HTTPS.equals(adapter.getStorageProviderStorageType(out));
                            if (collectImportOperationOutcomes) {
                                FHIRGenerator.generator(Format.JSON)
                                    .generate(generateException(line, msg),
                                            transientUserData.getBufferStreamForImportError());
                                transientUserData.getBufferStreamForImportError().write(NDJSON_LINESEPERATOR);
                            }
                        }
                    }
                } catch (Exception ex) {
                    LOG.warning("readFhirResourceFromObjectStore: Error proccesing file [" + workItem + "] - " + ex.getMessage());
                    // Throw exception to fail the job, the job can be continued from the current checkpoint after the
                    // problem is solved.
                    throw new FHIRException("Unable to read from S3 during processing", ex);
                } finally {
                    try {
                        reader.drain();
                    } catch (Exception s3e) {
                        LOG.fine(() -> "Error while draining the stream, this is benign");
                        LOG.throwing("S3Provider", "readFromObjectStoreWithLowMaxRange", s3e);
                    }
                }

                // Increment if the last line fails
                if (this.transientUserData.getImportFileSize() <= this.transientUserData.getCurrentBytes()) {
                    parseFailures++;
                }
            } catch (FHIRException fe) {
                throw fe;
            } catch (Exception e) {
                throw new FHIRException("Unable to read from S3 File", e);
            }

            // Condition: The optimized block and the number of Resources read
            // exceed the minimum thresholds or the maximum size of a single resource
            if (tempResources.size() >= maxRead) {
                LOG.fine("TempResourceSize " + tempResources.size());
                complete = true;
            }

            // Condition: The optimized block is exceeded and the number of resources is
            // only one so we want to threshold a maximum number of resources
            // 512K * 5 segments (we don't want to repeat too much work) = 2.6M
            if (numberOfBytesRead > 2621440 && tempResources.size() >= 1) {
                complete = true;
            }

            // Condition: The maximum read block is exceeded and we have at least one Resource
            // 2147483648 / (256*1024*1024) = 8192 Reads
            if (mux == 8193) {
                throw new FHIRException("Too Long a Line");
            }

            // We've read more than one window
            if (mux > 1 && tempResources.size() >=1) {
                break;
            }
        }

        // Condition: There is no complete resource to read.
        if (totalReads == 0) {
            LOG.warning("File grew since the start");
            this.transientUserData.setCurrentBytes(this.transientUserData.getImportFileSize());
        }

        // Add the accumulated resources
        this.resources.addAll(tempResources);
    }

The above code was created and licensed as part of the IBM/FHIR project.

Net, carefully approach Unicode formats, becareful on reassembling bytes and reading windows from Channels.

Leave a Reply

Your email address will not be published.

This site uses Akismet to reduce spam. Learn how your comment data is processed.