mirror of
https://github.com/rocky-linux/peridot.git
synced 2024-10-19 15:55:08 +00:00
507 lines
10 KiB
Go
507 lines
10 KiB
Go
|
package eventstream
|
||
|
|
||
|
import (
|
||
|
"encoding/base64"
|
||
|
"encoding/binary"
|
||
|
"fmt"
|
||
|
"io"
|
||
|
"strconv"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
const maxHeaderValueLen = 1<<15 - 1 // 2^15-1 or 32KB - 1
|
||
|
|
||
|
// valueType is the EventStream header value type.
|
||
|
type valueType uint8
|
||
|
|
||
|
// Header value types
|
||
|
const (
|
||
|
trueValueType valueType = iota
|
||
|
falseValueType
|
||
|
int8ValueType // Byte
|
||
|
int16ValueType // Short
|
||
|
int32ValueType // Integer
|
||
|
int64ValueType // Long
|
||
|
bytesValueType
|
||
|
stringValueType
|
||
|
timestampValueType
|
||
|
uuidValueType
|
||
|
)
|
||
|
|
||
|
func (t valueType) String() string {
|
||
|
switch t {
|
||
|
case trueValueType:
|
||
|
return "bool"
|
||
|
case falseValueType:
|
||
|
return "bool"
|
||
|
case int8ValueType:
|
||
|
return "int8"
|
||
|
case int16ValueType:
|
||
|
return "int16"
|
||
|
case int32ValueType:
|
||
|
return "int32"
|
||
|
case int64ValueType:
|
||
|
return "int64"
|
||
|
case bytesValueType:
|
||
|
return "byte_array"
|
||
|
case stringValueType:
|
||
|
return "string"
|
||
|
case timestampValueType:
|
||
|
return "timestamp"
|
||
|
case uuidValueType:
|
||
|
return "uuid"
|
||
|
default:
|
||
|
return fmt.Sprintf("unknown value type %d", uint8(t))
|
||
|
}
|
||
|
}
|
||
|
|
||
|
type rawValue struct {
|
||
|
Type valueType
|
||
|
Len uint16 // Only set for variable length slices
|
||
|
Value []byte // byte representation of value, BigEndian encoding.
|
||
|
}
|
||
|
|
||
|
func (r rawValue) encodeScalar(w io.Writer, v interface{}) error {
|
||
|
return binaryWriteFields(w, binary.BigEndian,
|
||
|
r.Type,
|
||
|
v,
|
||
|
)
|
||
|
}
|
||
|
|
||
|
func (r rawValue) encodeFixedSlice(w io.Writer, v []byte) error {
|
||
|
binary.Write(w, binary.BigEndian, r.Type)
|
||
|
|
||
|
_, err := w.Write(v)
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
func (r rawValue) encodeBytes(w io.Writer, v []byte) error {
|
||
|
if len(v) > maxHeaderValueLen {
|
||
|
return LengthError{
|
||
|
Part: "header value",
|
||
|
Want: maxHeaderValueLen, Have: len(v),
|
||
|
Value: v,
|
||
|
}
|
||
|
}
|
||
|
r.Len = uint16(len(v))
|
||
|
|
||
|
err := binaryWriteFields(w, binary.BigEndian,
|
||
|
r.Type,
|
||
|
r.Len,
|
||
|
)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
_, err = w.Write(v)
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
func (r rawValue) encodeString(w io.Writer, v string) error {
|
||
|
if len(v) > maxHeaderValueLen {
|
||
|
return LengthError{
|
||
|
Part: "header value",
|
||
|
Want: maxHeaderValueLen, Have: len(v),
|
||
|
Value: v,
|
||
|
}
|
||
|
}
|
||
|
r.Len = uint16(len(v))
|
||
|
|
||
|
type stringWriter interface {
|
||
|
WriteString(string) (int, error)
|
||
|
}
|
||
|
|
||
|
err := binaryWriteFields(w, binary.BigEndian,
|
||
|
r.Type,
|
||
|
r.Len,
|
||
|
)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
if sw, ok := w.(stringWriter); ok {
|
||
|
_, err = sw.WriteString(v)
|
||
|
} else {
|
||
|
_, err = w.Write([]byte(v))
|
||
|
}
|
||
|
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
func decodeFixedBytesValue(r io.Reader, buf []byte) error {
|
||
|
_, err := io.ReadFull(r, buf)
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
func decodeBytesValue(r io.Reader) ([]byte, error) {
|
||
|
var raw rawValue
|
||
|
var err error
|
||
|
raw.Len, err = decodeUint16(r)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
buf := make([]byte, raw.Len)
|
||
|
_, err = io.ReadFull(r, buf)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
return buf, nil
|
||
|
}
|
||
|
|
||
|
func decodeStringValue(r io.Reader) (string, error) {
|
||
|
v, err := decodeBytesValue(r)
|
||
|
return string(v), err
|
||
|
}
|
||
|
|
||
|
// Value represents the abstract header value.
|
||
|
type Value interface {
|
||
|
Get() interface{}
|
||
|
String() string
|
||
|
valueType() valueType
|
||
|
encode(io.Writer) error
|
||
|
}
|
||
|
|
||
|
// An BoolValue provides eventstream encoding, and representation
|
||
|
// of a Go bool value.
|
||
|
type BoolValue bool
|
||
|
|
||
|
// Get returns the underlying type
|
||
|
func (v BoolValue) Get() interface{} {
|
||
|
return bool(v)
|
||
|
}
|
||
|
|
||
|
// valueType returns the EventStream header value type value.
|
||
|
func (v BoolValue) valueType() valueType {
|
||
|
if v {
|
||
|
return trueValueType
|
||
|
}
|
||
|
return falseValueType
|
||
|
}
|
||
|
|
||
|
func (v BoolValue) String() string {
|
||
|
return strconv.FormatBool(bool(v))
|
||
|
}
|
||
|
|
||
|
// encode encodes the BoolValue into an eventstream binary value
|
||
|
// representation.
|
||
|
func (v BoolValue) encode(w io.Writer) error {
|
||
|
return binary.Write(w, binary.BigEndian, v.valueType())
|
||
|
}
|
||
|
|
||
|
// An Int8Value provides eventstream encoding, and representation of a Go
|
||
|
// int8 value.
|
||
|
type Int8Value int8
|
||
|
|
||
|
// Get returns the underlying value.
|
||
|
func (v Int8Value) Get() interface{} {
|
||
|
return int8(v)
|
||
|
}
|
||
|
|
||
|
// valueType returns the EventStream header value type value.
|
||
|
func (Int8Value) valueType() valueType {
|
||
|
return int8ValueType
|
||
|
}
|
||
|
|
||
|
func (v Int8Value) String() string {
|
||
|
return fmt.Sprintf("0x%02x", int8(v))
|
||
|
}
|
||
|
|
||
|
// encode encodes the Int8Value into an eventstream binary value
|
||
|
// representation.
|
||
|
func (v Int8Value) encode(w io.Writer) error {
|
||
|
raw := rawValue{
|
||
|
Type: v.valueType(),
|
||
|
}
|
||
|
|
||
|
return raw.encodeScalar(w, v)
|
||
|
}
|
||
|
|
||
|
func (v *Int8Value) decode(r io.Reader) error {
|
||
|
n, err := decodeUint8(r)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
*v = Int8Value(n)
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// An Int16Value provides eventstream encoding, and representation of a Go
|
||
|
// int16 value.
|
||
|
type Int16Value int16
|
||
|
|
||
|
// Get returns the underlying value.
|
||
|
func (v Int16Value) Get() interface{} {
|
||
|
return int16(v)
|
||
|
}
|
||
|
|
||
|
// valueType returns the EventStream header value type value.
|
||
|
func (Int16Value) valueType() valueType {
|
||
|
return int16ValueType
|
||
|
}
|
||
|
|
||
|
func (v Int16Value) String() string {
|
||
|
return fmt.Sprintf("0x%04x", int16(v))
|
||
|
}
|
||
|
|
||
|
// encode encodes the Int16Value into an eventstream binary value
|
||
|
// representation.
|
||
|
func (v Int16Value) encode(w io.Writer) error {
|
||
|
raw := rawValue{
|
||
|
Type: v.valueType(),
|
||
|
}
|
||
|
return raw.encodeScalar(w, v)
|
||
|
}
|
||
|
|
||
|
func (v *Int16Value) decode(r io.Reader) error {
|
||
|
n, err := decodeUint16(r)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
*v = Int16Value(n)
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// An Int32Value provides eventstream encoding, and representation of a Go
|
||
|
// int32 value.
|
||
|
type Int32Value int32
|
||
|
|
||
|
// Get returns the underlying value.
|
||
|
func (v Int32Value) Get() interface{} {
|
||
|
return int32(v)
|
||
|
}
|
||
|
|
||
|
// valueType returns the EventStream header value type value.
|
||
|
func (Int32Value) valueType() valueType {
|
||
|
return int32ValueType
|
||
|
}
|
||
|
|
||
|
func (v Int32Value) String() string {
|
||
|
return fmt.Sprintf("0x%08x", int32(v))
|
||
|
}
|
||
|
|
||
|
// encode encodes the Int32Value into an eventstream binary value
|
||
|
// representation.
|
||
|
func (v Int32Value) encode(w io.Writer) error {
|
||
|
raw := rawValue{
|
||
|
Type: v.valueType(),
|
||
|
}
|
||
|
return raw.encodeScalar(w, v)
|
||
|
}
|
||
|
|
||
|
func (v *Int32Value) decode(r io.Reader) error {
|
||
|
n, err := decodeUint32(r)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
*v = Int32Value(n)
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// An Int64Value provides eventstream encoding, and representation of a Go
|
||
|
// int64 value.
|
||
|
type Int64Value int64
|
||
|
|
||
|
// Get returns the underlying value.
|
||
|
func (v Int64Value) Get() interface{} {
|
||
|
return int64(v)
|
||
|
}
|
||
|
|
||
|
// valueType returns the EventStream header value type value.
|
||
|
func (Int64Value) valueType() valueType {
|
||
|
return int64ValueType
|
||
|
}
|
||
|
|
||
|
func (v Int64Value) String() string {
|
||
|
return fmt.Sprintf("0x%016x", int64(v))
|
||
|
}
|
||
|
|
||
|
// encode encodes the Int64Value into an eventstream binary value
|
||
|
// representation.
|
||
|
func (v Int64Value) encode(w io.Writer) error {
|
||
|
raw := rawValue{
|
||
|
Type: v.valueType(),
|
||
|
}
|
||
|
return raw.encodeScalar(w, v)
|
||
|
}
|
||
|
|
||
|
func (v *Int64Value) decode(r io.Reader) error {
|
||
|
n, err := decodeUint64(r)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
*v = Int64Value(n)
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// An BytesValue provides eventstream encoding, and representation of a Go
|
||
|
// byte slice.
|
||
|
type BytesValue []byte
|
||
|
|
||
|
// Get returns the underlying value.
|
||
|
func (v BytesValue) Get() interface{} {
|
||
|
return []byte(v)
|
||
|
}
|
||
|
|
||
|
// valueType returns the EventStream header value type value.
|
||
|
func (BytesValue) valueType() valueType {
|
||
|
return bytesValueType
|
||
|
}
|
||
|
|
||
|
func (v BytesValue) String() string {
|
||
|
return base64.StdEncoding.EncodeToString([]byte(v))
|
||
|
}
|
||
|
|
||
|
// encode encodes the BytesValue into an eventstream binary value
|
||
|
// representation.
|
||
|
func (v BytesValue) encode(w io.Writer) error {
|
||
|
raw := rawValue{
|
||
|
Type: v.valueType(),
|
||
|
}
|
||
|
|
||
|
return raw.encodeBytes(w, []byte(v))
|
||
|
}
|
||
|
|
||
|
func (v *BytesValue) decode(r io.Reader) error {
|
||
|
buf, err := decodeBytesValue(r)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
*v = BytesValue(buf)
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// An StringValue provides eventstream encoding, and representation of a Go
|
||
|
// string.
|
||
|
type StringValue string
|
||
|
|
||
|
// Get returns the underlying value.
|
||
|
func (v StringValue) Get() interface{} {
|
||
|
return string(v)
|
||
|
}
|
||
|
|
||
|
// valueType returns the EventStream header value type value.
|
||
|
func (StringValue) valueType() valueType {
|
||
|
return stringValueType
|
||
|
}
|
||
|
|
||
|
func (v StringValue) String() string {
|
||
|
return string(v)
|
||
|
}
|
||
|
|
||
|
// encode encodes the StringValue into an eventstream binary value
|
||
|
// representation.
|
||
|
func (v StringValue) encode(w io.Writer) error {
|
||
|
raw := rawValue{
|
||
|
Type: v.valueType(),
|
||
|
}
|
||
|
|
||
|
return raw.encodeString(w, string(v))
|
||
|
}
|
||
|
|
||
|
func (v *StringValue) decode(r io.Reader) error {
|
||
|
s, err := decodeStringValue(r)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
*v = StringValue(s)
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// An TimestampValue provides eventstream encoding, and representation of a Go
|
||
|
// timestamp.
|
||
|
type TimestampValue time.Time
|
||
|
|
||
|
// Get returns the underlying value.
|
||
|
func (v TimestampValue) Get() interface{} {
|
||
|
return time.Time(v)
|
||
|
}
|
||
|
|
||
|
// valueType returns the EventStream header value type value.
|
||
|
func (TimestampValue) valueType() valueType {
|
||
|
return timestampValueType
|
||
|
}
|
||
|
|
||
|
func (v TimestampValue) epochMilli() int64 {
|
||
|
nano := time.Time(v).UnixNano()
|
||
|
msec := nano / int64(time.Millisecond)
|
||
|
return msec
|
||
|
}
|
||
|
|
||
|
func (v TimestampValue) String() string {
|
||
|
msec := v.epochMilli()
|
||
|
return strconv.FormatInt(msec, 10)
|
||
|
}
|
||
|
|
||
|
// encode encodes the TimestampValue into an eventstream binary value
|
||
|
// representation.
|
||
|
func (v TimestampValue) encode(w io.Writer) error {
|
||
|
raw := rawValue{
|
||
|
Type: v.valueType(),
|
||
|
}
|
||
|
|
||
|
msec := v.epochMilli()
|
||
|
return raw.encodeScalar(w, msec)
|
||
|
}
|
||
|
|
||
|
func (v *TimestampValue) decode(r io.Reader) error {
|
||
|
n, err := decodeUint64(r)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
*v = TimestampValue(timeFromEpochMilli(int64(n)))
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// MarshalJSON implements the json.Marshaler interface
|
||
|
func (v TimestampValue) MarshalJSON() ([]byte, error) {
|
||
|
return []byte(v.String()), nil
|
||
|
}
|
||
|
|
||
|
func timeFromEpochMilli(t int64) time.Time {
|
||
|
secs := t / 1e3
|
||
|
msec := t % 1e3
|
||
|
return time.Unix(secs, msec*int64(time.Millisecond)).UTC()
|
||
|
}
|
||
|
|
||
|
// An UUIDValue provides eventstream encoding, and representation of a UUID
|
||
|
// value.
|
||
|
type UUIDValue [16]byte
|
||
|
|
||
|
// Get returns the underlying value.
|
||
|
func (v UUIDValue) Get() interface{} {
|
||
|
return v[:]
|
||
|
}
|
||
|
|
||
|
// valueType returns the EventStream header value type value.
|
||
|
func (UUIDValue) valueType() valueType {
|
||
|
return uuidValueType
|
||
|
}
|
||
|
|
||
|
func (v UUIDValue) String() string {
|
||
|
return fmt.Sprintf(`%X-%X-%X-%X-%X`, v[0:4], v[4:6], v[6:8], v[8:10], v[10:])
|
||
|
}
|
||
|
|
||
|
// encode encodes the UUIDValue into an eventstream binary value
|
||
|
// representation.
|
||
|
func (v UUIDValue) encode(w io.Writer) error {
|
||
|
raw := rawValue{
|
||
|
Type: v.valueType(),
|
||
|
}
|
||
|
|
||
|
return raw.encodeFixedSlice(w, v[:])
|
||
|
}
|
||
|
|
||
|
func (v *UUIDValue) decode(r io.Reader) error {
|
||
|
tv := (*v)[:]
|
||
|
return decodeFixedBytesValue(r, tv)
|
||
|
}
|