Skip to content

Commit

Permalink
Adds pipeline and derives default.stdin binding
Browse files Browse the repository at this point in the history
  • Loading branch information
RCHowell committed Apr 23, 2024
1 parent 23cc28c commit e21af19
Show file tree
Hide file tree
Showing 8 changed files with 242 additions and 57 deletions.
1 change: 1 addition & 0 deletions partiql-cli/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies {
implementation(project(":partiql-types"))
implementation(project(":partiql-spi"))
implementation(project(":plugins:partiql-local"))
implementation(project(":plugins:partiql-memory"))
implementation(Deps.csv)
implementation(Deps.awsSdkBom)
implementation(Deps.awsSdkDynamodb)
Expand Down
167 changes: 120 additions & 47 deletions partiql-cli/src/main/kotlin/org/partiql/cli/Main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,29 @@

package org.partiql.cli

import com.amazon.ion.system.IonReaderBuilder
import com.amazon.ion.system.IonTextWriterBuilder
import com.amazon.ionelement.api.ionListOf
import com.amazon.ionelement.api.ionNull
import com.amazon.ionelement.api.loadAllElements
import org.partiql.cli.io.Format
import org.partiql.cli.pipeline.Pipeline
import org.partiql.cli.shell.Shell
import org.partiql.eval.PartiQLEngine
import org.partiql.eval.PartiQLResult
import org.partiql.plugins.memory.MemoryCatalog
import org.partiql.plugins.memory.MemoryConnector
import org.partiql.spi.connector.Connector
import org.partiql.spi.connector.sql.info.InfoSchema
import org.partiql.types.StaticType
import org.partiql.value.PartiQLValueExperimental
import org.partiql.value.toIon
import picocli.CommandLine
import java.io.File
import java.io.InputStream
import java.io.SequenceInputStream
import java.time.Instant
import java.util.Collections
import java.util.Properties
import kotlin.system.exitProcess

Expand Down Expand Up @@ -72,15 +92,15 @@ internal class MainCommand() : Runnable {
names = ["--strict"],
description = ["Execute in strict (type-checking) mode."],
)
var strict: Boolean? = false
var strict: Boolean = false

@CommandLine.Option(
names = ["-f", "--format"],
description = ["The data format, using the form <input>[:<output>]."],
paramLabel = "<input[:output]>",
converter = [Format.Converter::class],
)
var format: Pair<Format, Format>? = null
lateinit var format: Pair<Format, Format>

@CommandLine.Option(
names = ["-i", "--include"],
Expand Down Expand Up @@ -108,8 +128,10 @@ internal class MainCommand() : Runnable {
* Run the CLI or Shell (default)
*/
override fun run() {
val statement: String? = statement()
println(statement)
when (val statement = statement()) {
null -> shell()
else -> run(statement)
}
}

/**
Expand All @@ -122,49 +144,100 @@ internal class MainCommand() : Runnable {
return program?.first ?: include?.readText()
}

// /**
// * Runs the CLI
// */
// private fun runCli(exec: ExecutionOptions, stream: InputStream) {
// val input = when (exec.inputFile) {
// null -> EmptyInputStream()
// else -> FileInputStream(exec.inputFile!!)
// }
// val output = when (exec.outputFile) {
// null -> UnclosableOutputStream(System.out)
// else -> FileOutputStream(exec.outputFile!!)
// }
// val query = stream.readBytes().toString(Charsets.UTF_8)
// val queryLines = query.lines()
// val queryWithoutShebang = when (queryLines.firstOrNull()?.startsWith(SHEBANG_PREFIX)) {
// false -> query
// else -> queryLines.subList(1, queryLines.size).joinToString(System.lineSeparator())
// }
// input.use { src ->
// output.use { out ->
// Cli(
// ion,
// src,
// exec.inputFormat,
// out,
// exec.outputFormat,
// options.pipeline,
// options.environment,
// queryWithoutShebang,
// exec.wrapIon
// ).run()
// out.write(System.lineSeparator().toByteArray(Charsets.UTF_8))
// }
// }
// }
//
// /**
// * Runs the interactive shell
// */
// private fun runShell(shell: ShellOptions = ShellOptions()) {
// val config = Shell.ShellConfiguration(isMonochrome = shell.isMonochrome)
// Shell(System.out, options.pipeline, options.environment, config).start()
// }
private fun shell() {
val pipeline = when (strict) {
true -> Pipeline.strict()
else -> Pipeline.default()
}
Shell(pipeline).start()
}

@OptIn(PartiQLValueExperimental::class)
private fun run(statement: String) {
val pipeline = when (strict) {
true -> Pipeline.strict()
else -> Pipeline.default()
}
val program = statement.trimHashBang()
val session = session()
val result = pipeline.execute(program, session)
when (result) {
is PartiQLResult.Error -> {
error(result.cause.stackTrace)
}
is PartiQLResult.Value -> {
// TODO handle output format
val ion = result.value.toIon()
val writer = IonTextWriterBuilder.pretty().build(System.out as Appendable)
ion.writeTo(writer)
println()
}
}
}

private fun session() = Pipeline.Session(
queryId = "cli",
userId = System.getProperty("user.name"),
currentCatalog = "default",
currentDirectory = emptyList(),
connectors = connectors(),
instant = Instant.now(),
debug = false,
mode = when (strict) {
true -> PartiQLEngine.Mode.STRICT
else -> PartiQLEngine.Mode.PERMISSIVE
}
)

/**
* Produce a list of connectors
*/
private fun connectors(): Map<String, Connector> {
if (dir != null && files != null && files!!.isNotEmpty()) {
error("Cannot specify both a database directory and a list of files.")
}
if (dir != null) {
TODO("Support local directory database")
}
// Derive a `default catalog from stdin (or file streams)
var streams: List<InputStream> = files?.map { it.inputStream() } ?: emptyList()
if (streams.isEmpty() && System.`in`.available() != 0) {
streams = listOf(System.`in`)
}
val value = if (streams.isNotEmpty()) {
val stream = SequenceInputStream(Collections.enumeration(streams))
val reader = IonReaderBuilder.standard().build(stream)
val values = loadAllElements(reader).toList()
when (values.size) {
0 -> ionNull()
1 -> values.first()
else -> ionListOf(values)
}
} else {
ionNull()
}

val catalog = MemoryCatalog.builder()
.name("default")
.info(InfoSchema.ext())
.define(
name = "stdin",
type = StaticType.ANY,
value = value,
)
.build()
return mapOf(
"default" to MemoryConnector(catalog)
)
}

private fun String.trimHashBang(): String {
val lines = this.lines()
return when (lines.firstOrNull()?.startsWith(SHEBANG_PREFIX)) {
false -> this
else -> lines.subList(1, lines.size).joinToString(System.lineSeparator())
}
}

private class PairConverter : CommandLine.ITypeConverter<Pair<String?, File?>> {

Expand Down
4 changes: 2 additions & 2 deletions partiql-cli/src/main/kotlin/org/partiql/cli/io/Format.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ internal enum class Format {
private const val ENUM = "[a-z][a-z0-9_]*"
private const val PATTERN = "($ENUM)(:$ENUM)?"

override fun convert(value: String?): Pair<Format, Format>? {
override fun convert(value: String?): Pair<Format, Format> {
if (value == null) {
return null
return PARTIQL to PARTIQL
}
if (value.matches(Regex(PATTERN))) {
error("Format argument does not match $PATTERN")
Expand Down
110 changes: 110 additions & 0 deletions partiql-cli/src/main/kotlin/org/partiql/cli/pipeline/Pipeline.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package org.partiql.cli.pipeline

import org.partiql.ast.Statement
import org.partiql.errors.Problem
import org.partiql.errors.ProblemCallback
import org.partiql.errors.ProblemSeverity
import org.partiql.eval.PartiQLEngine
import org.partiql.eval.PartiQLResult
import org.partiql.parser.PartiQLParser
import org.partiql.plan.PartiQLPlan
import org.partiql.planner.PartiQLPlanner
import org.partiql.spi.connector.Connector
import org.partiql.spi.connector.ConnectorSession
import java.time.Instant

internal class Pipeline private constructor(
private val parser: PartiQLParser,
private val planner: PartiQLPlanner,
private val engine: PartiQLEngine,
) {

/**
* Combined planner and engine session.
*/
data class Session(
@JvmField val queryId: String,
@JvmField val userId: String,
@JvmField val currentCatalog: String,
@JvmField val currentDirectory: List<String>,
@JvmField val connectors: Map<String, Connector>,
@JvmField val instant: Instant,
@JvmField val debug: Boolean,
@JvmField val mode: PartiQLEngine.Mode,
) {

private val connector = object : ConnectorSession {
override fun getQueryId(): String = queryId
override fun getUserId(): String = userId
}

fun planner() = PartiQLPlanner.Session(
queryId = queryId,
userId = userId,
currentCatalog = currentCatalog,
currentDirectory = currentDirectory,
catalogs = connectors.mapValues { it.value.getMetadata(connector) },
instant = instant,
)

fun engine() = PartiQLEngine.Session(
connectors = connectors,
mode = mode
)
}

/**
* TODO replace with the ResultSet equivalent?
*/
fun execute(statement: String, session: Session): PartiQLResult {
val ast = parse(statement)
val plan = plan(ast, session)
return execute(plan, session)
}

private fun parse(source: String): Statement {
val result = parser.parse(source)
return result.root
}

private fun plan(statement: Statement, session: Session): PartiQLPlan {
val callback = ProblemListener()
val result = planner.plan(statement, session.planner(), callback)
val errors = callback.problems.filter { it.details.severity == ProblemSeverity.ERROR }
if (errors.isNotEmpty()) {
throw RuntimeException("Planner encountered errors: ${errors.joinToString()}")
}
return result.plan
}

private fun execute(plan: PartiQLPlan, session: Session): PartiQLResult {
val statement = engine.prepare(plan, session.engine())
return engine.execute(statement)
}

private class ProblemListener : ProblemCallback {

val problems = mutableListOf<Problem>()

override fun invoke(p1: Problem) {
problems.add(p1)
}
}

companion object {

fun default(): Pipeline {
val parser = PartiQLParser.default()
val planner = PartiQLPlanner.default()
val engine = PartiQLEngine.default()
return Pipeline(parser, planner, engine)
}

fun strict(): Pipeline {
val parser = PartiQLParser.default()
val planner = PartiQLPlanner.builder().signalMode().build()
val engine = PartiQLEngine.default()
return Pipeline(parser, planner, engine)
}
}
}
11 changes: 6 additions & 5 deletions partiql-cli/src/main/kotlin/org/partiql/cli/shell/Shell.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.jline.utils.AttributedStringBuilder
import org.jline.utils.AttributedStyle
import org.jline.utils.InfoCmp
import org.joda.time.Duration
import org.partiql.cli.pipeline.Pipeline
import java.io.Closeable
import java.io.PrintStream
import java.nio.file.Path
Expand Down Expand Up @@ -97,16 +98,16 @@ val doneCompiling = AtomicBoolean(true)
val donePrinting = AtomicBoolean(true)

internal class Shell(
private val state: State,
private val pipeline: Pipeline,
) {

class State(
@JvmField var debug: Boolean,
private var state: State = State(false)

private class State(
@JvmField var debug: Boolean
)

private val home: Path = Paths.get(System.getProperty("user.home"))
private val plugins: Path = home.resolve(".partiql").resolve("plugins")
private val user = System.getProperty("user.name")
private val out = PrintStream(System.out)

fun start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public interface PartiQLEngine {
}

public class Session(
val catalogs: Map<String, Connector> = mapOf(),
val connectors: Map<String, Connector> = mapOf(),
val mode: Mode = Mode.PERMISSIVE
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ internal class Symbols private constructor(private val catalogs: Array<C>) {
@JvmStatic
fun build(plan: PartiQLPlan, session: PartiQLEngine.Session): Symbols {
val catalogs = plan.catalogs.map {
val connector = session.catalogs[it.name]
val connector = session.connectors[it.name]
?: error("The plan contains a catalog `${it.name}`, but this was absent from the engine's session")
C(
name = it.name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class EvalExecutor(
}

val evalSession = PartiQLEngine.Session(
catalogs = mutableMapOf(
connectors = mutableMapOf(
"default" to connector
),
mode = mode
Expand Down

0 comments on commit e21af19

Please sign in to comment.