1/* Title: Pure/General/byte_message.scala 2 Author: Makarius 3 4Byte-oriented messages. 5*/ 6 7package isabelle 8 9import java.io.{ByteArrayOutputStream, OutputStream, InputStream, IOException} 10 11 12object Byte_Message 13{ 14 /* output operations */ 15 16 def write(stream: OutputStream, bytes: List[Bytes]): Unit = 17 bytes.foreach(_.write_stream(stream)) 18 19 def flush(stream: OutputStream): Unit = 20 try { stream.flush() } 21 catch { case _: IOException => } 22 23 def write_line(stream: OutputStream, bytes: Bytes): Unit = 24 { 25 write(stream, List(bytes, Bytes.newline)) 26 flush(stream) 27 } 28 29 30 /* input operations */ 31 32 def read(stream: InputStream, n: Int): Bytes = 33 Bytes.read_stream(stream, limit = n) 34 35 def read_block(stream: InputStream, n: Int): (Option[Bytes], Int) = 36 { 37 val msg = read(stream, n) 38 val len = msg.length 39 (if (len == n) Some(msg) else None, len) 40 } 41 42 def read_line(stream: InputStream): Option[Bytes] = 43 { 44 val line = new ByteArrayOutputStream(100) 45 var c = 0 46 while ({ c = stream.read; c != -1 && c != 10 }) line.write(c) 47 48 if (c == -1 && line.size == 0) None 49 else { 50 val a = line.toByteArray 51 val n = a.length 52 val len = if (n > 0 && a(n - 1) == 13) n - 1 else n 53 Some(Bytes(a, 0, len)) 54 } 55 } 56 57 58 /* messages with multiple chunks (arbitrary content) */ 59 60 private def make_header(ns: List[Int]): List[Bytes] = 61 List(Bytes(ns.mkString(",")), Bytes.newline) 62 63 def write_message(stream: OutputStream, chunks: List[Bytes]) 64 { 65 write(stream, make_header(chunks.map(_.length)) ::: chunks) 66 flush(stream) 67 } 68 69 private def parse_header(line: String): List[Int] = 70 try { space_explode(',', line).map(Value.Nat.parse) } 71 catch { case ERROR(_) => error("Malformed message header: " + quote(line)) } 72 73 private def read_chunk(stream: InputStream, n: Int): Bytes = 74 read_block(stream, n) match { 75 case (Some(chunk), _) => chunk 76 case (None, len) => 77 error("Malformed message chunk: unexpected EOF after " + len + " of " + n + " bytes") 78 } 79 80 def read_message(stream: InputStream): Option[List[Bytes]] = 81 read_line(stream).map(line => parse_header(line.text).map(read_chunk(stream, _))) 82 83 84 /* hybrid messages: line or length+block (restricted content) */ 85 86 private def is_length(msg: Bytes): Boolean = 87 !msg.is_empty && msg.iterator.forall(b => Symbol.is_ascii_digit(b.toChar)) 88 89 private def is_terminated(msg: Bytes): Boolean = 90 { 91 val len = msg.length 92 len > 0 && Symbol.is_ascii_line_terminator(msg.charAt(len - 1)) 93 } 94 95 def write_line_message(stream: OutputStream, msg: Bytes) 96 { 97 if (is_length(msg) || is_terminated(msg)) 98 error ("Bad content for line message:\n" ++ msg.text.take(100)) 99 100 val n = msg.length 101 write(stream, 102 (if (n > 100 || msg.iterator.contains(10)) make_header(List(n + 1)) else Nil) ::: 103 List(msg, Bytes.newline)) 104 flush(stream) 105 } 106 107 def read_line_message(stream: InputStream): Option[Bytes] = 108 read_line(stream) match { 109 case None => None 110 case Some(line) => 111 Value.Nat.unapply(line.text) match { 112 case None => Some(line) 113 case Some(n) => read_block(stream, n)._1.map(_.trim_line) 114 } 115 } 116} 117