Getting Started
Documentation

Samza SQL


Overview

Samza SQL allows you to define your stream processing logic declaratively as a a SQL query. This allows you to create streaming pipelines without Java code or configuration unless you require user-defined functions (UDFs). Support for SQL internally uses Apache Calcite, which provides SQL parsing and query planning. The query is automatically translated to Samza’s high level API and runs on Samza’s execution engine.

You can run Samza SQL locally on your machine or on a YARN cluster.

Running Samza SQL on your local machine

The Samza SQL console allows you to experiment with Samza SQL locally on your machine.

Setup Kafka

Follow the instructions from the Kafka quickstart to start the zookeeper and Kafka server.

Let us create a Kafka topic named ‚ÄúProfileChangeStream‚ÄĚ for this demo.

>./deploy/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic ProfileChangeStream

Download the Samza tools package from here and use the generate-kafka-events script populate the stream with sample data.

> cd samza-tools-<version>
> ./scripts/generate-kafka-events.sh -t ProfileChangeStream -e ProfileChange

Using the Samza SQL Console

The simplest SQL query is to read all events from a Kafka topic ProfileChangeStream and print them to the console.

> ./scripts/samza-sql-console.sh --sql "insert into log.consoleoutput select * from kafka.ProfileChangeStream"

Next, let us project a few fields from the input stream.

> ./scripts/samza-sql-console.sh --sql "insert into log.consoleoutput select Name, OldCompany, NewCompany from kafka.ProfileChangeStream"

You can also filter messages in the input stream based on some predicate. In this example, we filter profiles currently working at LinkedIn, whose previous employer matches the regex .*soft. The function RegexMatch(regex, company) is an example of a UDF that defines a predicate.

> ./scripts/samza-sql-console.sh --sql "insert into log.consoleoutput select Name as __key__, Name, NewCompany, RegexMatch('.*soft', OldCompany) from kafka.ProfileChangeStream where NewCompany = 'LinkedIn'"

Running Samza SQL on YARN

The hello-samza project has examples to get started with Samza on YARN. You can define your SQL query in a configuration file and submit it to a YARN cluster.

> ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/page-view-filter-sql.properties

SQL Grammar

Samza SQL’s grammar is a subset of capabilities supported by Calcite’s SQL parser.

statement:
  |   insert
  |   query 

query:
  values
  | {
      select
    }

insert:
      ( INSERT | UPSERT ) INTO tablePrimary
      [ '(' column [, column ]* ')' ]
      query 

select:
  SELECT
  { * | projectItem [, projectItem ]* }
  FROM tableExpression
  [ WHERE booleanExpression ]
  [ GROUP BY { groupItem [, groupItem ]* } ]

projectItem:
  expression [ [ AS ] columnAlias ]
  | tableAlias . *

tableExpression:
  tableReference [, tableReference ]*
  | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]

joinCondition:
  ON booleanExpression
  | USING '(' column [, column ]* ')'

tableReference:
  tablePrimary
  [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]

tablePrimary:
  [ TABLE ] [ [ catalogName . ] schemaName . ] tableName

values:
  VALUES expression [, expression ]*

Known Limitations

Samza SQL only supports simple stateless queries including selections and projections. We are actively working on supporting stateful operations such as aggregations, windows and joins.