Snippets

TargetSmart Communications MzeBRk: Untitled snippet

Created by Ben Stroud
package main

import (
	"encoding/json"
	"encoding/xml"
	"fmt"
	"io"
	"os"

	"github.com/linkedin/goavro"
	"github.com/tlarsendataguy/goalteryx/sdk"
)

type Configuration struct {
	InputPath string
}

type Plugin struct {
	provider sdk.Provider
	output   sdk.OutputAnchor
	outInfo  *sdk.OutgoingRecordInfo
	config   Configuration
}

func (p *Plugin) Init(provider sdk.Provider) {
	provider.Io().Info(fmt.Sprintf(`Init TargetSmartAvroReader tool %v`, provider.Environment().ToolId()))

	configBytes := []byte(provider.ToolConfig())
	err := xml.Unmarshal(configBytes, &p.config)
	if err != nil {
		provider.Io().Error(err.Error())
	}
	inputPath := p.config.InputPath
	provider.Io().Info(fmt.Sprintf(`TargetSmartAvroReader input file: %s`, inputPath))

	if !p.isValidAvroFile(inputPath) {
		provider.Io().Error("Invalid Avro file")
	}

	p.provider = provider
	p.output = provider.GetOutputAnchor(`Output`)

}

// Check that file path is valid Avro
func (p *Plugin) isValidAvroFile(filePath string) bool {
	file, err := os.Open(filePath)
	if err != nil {
		p.panic(fmt.Sprintf("Error opening file: %s", filePath), err)
	}
	defer file.Close()

	avroReader, err := goavro.NewOCFReader(file)
	if err != nil {
		p.panic("Error creating Avro reader", err)
	}

	for avroReader.Scan() {
		_, err = avroReader.Read()
		if err != nil && err != io.EOF {
			p.panic("Error reading Avro", err)
		}
		break
	}
	return true
}

func (p *Plugin) OnInputConnectionOpened(connection sdk.InputConnection) {
	// not implemented for input tool
}

func (p *Plugin) OnRecordPacket(connection sdk.InputConnection) {
	// not implemented for input tool
}

func (p *Plugin) panic(errmsg string, err error) {
	errmsg = fmt.Sprintf("TargetSmartAvroReader: %s", errmsg)
	p.provider.Io().Error(errmsg)
	if err != nil {
		panic(err)
	}
	panic(errmsg)
}

func (p *Plugin) OnComplete() {
	var outputFields []sdk.NewOutgoingField

	inputPath := p.config.InputPath

	if inputPath == "" {
		p.panic("Avro input path is required", nil)
	}

	avroFile, err := os.Open(inputPath)
	if err != nil {
		errmsg := fmt.Sprintf("Failed to open input file: %s", inputPath)
		p.panic(errmsg, err)
	}
	defer avroFile.Close()

	avroReader, err := goavro.NewOCFReader(avroFile)
	if err != nil {
		errmsg := fmt.Sprintf("Failed to create Avro reader")
		p.panic(errmsg, err)
	}
	fieldnames := getAvroFields(avroReader.Codec().Schema())

	for _, avrofield := range fieldnames {
		if avrofield.Type == "string" {
			outputFields = append(outputFields, sdk.NewV_WStringField(avrofield.Name, inputPath, 999999))
		} else if avrofield.Type == "boolean" {
			outputFields = append(outputFields, sdk.NewBoolField(avrofield.Name, inputPath))
		} else if avrofield.Type == "float" {
			outputFields = append(outputFields, sdk.NewFloatField(avrofield.Name, inputPath))
		} else if avrofield.Type == "double" {
			outputFields = append(outputFields, sdk.NewDoubleField(avrofield.Name, inputPath))
		} else if avrofield.Type == "long" {
			outputFields = append(outputFields, sdk.NewInt64Field(avrofield.Name, inputPath))
		} else if avrofield.Type == "int" {
			outputFields = append(outputFields, sdk.NewInt32Field(avrofield.Name, inputPath))
		} else if avrofield.Type == "null" {
			outputFields = append(outputFields, sdk.NewV_WStringField(avrofield.Name, inputPath, 999999))
		} else {
			p.panic(fmt.Sprintf("Type %s is not yet supported by this plugin.", avrofield.Type), nil)
		}
	}

	outRecInfo, _ := sdk.NewOutgoingRecordInfo(outputFields)

	if !p.output.IsOpen() {
		p.output.Open(outRecInfo)
	}

	// p.output.UpdateProgress(0.25)
	// p.provider.Io().UpdateProgress(0.25)

	var inputCnt uint

	for avroReader.Scan() {
		if !p.output.IsOpen() {
			break
		}
		record, err := avroReader.Read()
		if err != nil {
			p.panic("Failed to read record", err)
		}
		inputCnt++

		for _, avrofield := range fieldnames {
			value_map := record.(map[string]interface{})[avrofield.Name]
			if value_map != nil {
				for _, value := range value_map.(map[string]interface{}) {
					if value != nil {
						if avrofield.Type == "string" {
							outRecInfo.StringFields[avrofield.Name].SetString(value.(string))
						} else if avrofield.Type == "boolean" {
							outRecInfo.BoolFields[avrofield.Name].SetBool(value.(bool))
						} else if avrofield.Type == "float" || avrofield.Type == "double" {
							outRecInfo.FloatFields[avrofield.Name].SetFloat(value.(float64))
						} else if avrofield.Type == "int" {
							intval := int(value.(int32))
							outRecInfo.IntFields[avrofield.Name].SetInt(intval)
						} else if avrofield.Type == "long" {
							intval := int(value.(int64))
							outRecInfo.IntFields[avrofield.Name].SetInt(intval)
						} else if avrofield.Type == "null" {
							outRecInfo.StringFields[avrofield.Name].SetNull()
						} else {
							p.panic(fmt.Sprintf("Type error for field %s with non-nil value. Type %s is not yet supported by this plugin.", avrofield.Name, avrofield.Type), nil)
						}
					} else {
						if avrofield.Type == "string" {
							outRecInfo.StringFields[avrofield.Name].SetNull()
						} else if avrofield.Type == "boolean" {
							outRecInfo.BoolFields[avrofield.Name].SetNull()
						} else if avrofield.Type == "float" || avrofield.Type == "double" {
							outRecInfo.FloatFields[avrofield.Name].SetNull()
						} else if avrofield.Type == "long" || avrofield.Type == "int" {
							outRecInfo.IntFields[avrofield.Name].SetNull()
						} else if avrofield.Type == "null" {
							outRecInfo.StringFields[avrofield.Name].SetNull()
						} else {
							p.panic(fmt.Sprintf("Type error for field %s with nil value. Type %s is not yet supported by this plugin.", avrofield.Name, avrofield.Type), nil)
						}
					}
				}
			} else {
				if avrofield.Type == "string" {
					outRecInfo.StringFields[avrofield.Name].SetNull()
				} else if avrofield.Type == "boolean" {
					outRecInfo.BoolFields[avrofield.Name].SetNull()
				} else if avrofield.Type == "float" || avrofield.Type == "double" {
					outRecInfo.FloatFields[avrofield.Name].SetNull()
				} else if avrofield.Type == "long" || avrofield.Type == "int" {
					outRecInfo.IntFields[avrofield.Name].SetNull()
				} else if avrofield.Type == "null" {
					outRecInfo.StringFields[avrofield.Name].SetNull()
				} else {
					p.panic(fmt.Sprintf("Type %s is not yet supported by this plugin.", avrofield.Type), nil)
				}
			}
		}

		if !p.output.IsOpen() {
			break
		}
		p.outInfo = outRecInfo
		p.output.Write()
	}
	if p.output.IsOpen() {
		p.output.Close()
	}
	p.provider.Io().Info(`Done`)
}

type AvroField struct {
	Name       string
	Type       string
	IsNullable bool
}

// Parse json schema to return list of ordered field names.
func getAvroFields(jsonSchema string) []AvroField {
	type Field struct {
		Default interface{} // `json:"default"`
		Name    string      // `json:"name"`
		Type    []string    // `json:"type"`
	}

	type MetaAvro struct {
		Fields    []Field // `json:"fields"`
		Name      string  // `json:"name"`
		Namespace string  // `json:"namespace"`
		Type      string  // `json:"type"`
	}

	var fields []AvroField
	var meta MetaAvro
	err := json.Unmarshal([]byte(jsonSchema), &meta)
	if err != nil {
		return nil
	}
	for _, field := range meta.Fields {
		isNullable := field.Type[0] == "null"
		var fieldType string
		if isNullable {
			fieldType = field.Type[1]
		} else {
			fieldType = field.Type[0]
		}
		fields = append(fields, AvroField{
			Name:       field.Name,
			Type:       fieldType,
			IsNullable: isNullable,
		})
	}
	return fields
}

Comments (0)

HTTPS SSH

You can clone a snippet to your computer for local editing. Learn more.