Continuous Query
In summary, a CQL statement is like a SQL one, so the continuous query consists of a select, a from, a where, a goup and a having part.
We use the following example to explain basic details of CQL-Query.
SELECT auction, AVG(price) AS aprice FROM bid [SIZE 60 MINUTES ADVANCE 1 MINUTE TIME] WHERE auction > 10 GROUP BY auction HAVING aprice<100.0
Select
SELECT auction, AVG(price) AS aprice...
From
... FROM bid [SIZE 60 MINUTES ADVANCE 1 MINUTE TIME]...
The most different parts between usual SQL and CQL is the FROM part, because you have the possibility to definie windows. CQL defines them by squared brackets.
There are 3 different windows you can define: an unbounded window, a time-based window and a tuple-based window.
An unbounded window is defined by the keyword UNBOUNDED. It sets the end timestamp to infinite. Example:
SELECT * FROM bid [UNBOUNDED]
Since an unbounded window does not limit the validity of a stream element (in fact it is not really a window), the declaration of UNBOUNDED is optional. You get the same result without a window declaration. You can use the UNBOUNDED keyword to highlight that no window is defined.
A time-based window is defined by the size of the window as time span and an optional advance parameter. The latter defines after what time span the window should move (if no unit is declared, it has the same unit as size). Additional, a partition attribute can be defined. Syntax:
SELECT * FROM <source> [SIZE <size> <unit> TIME] SELECT * FROM <source> [SIZE <size> <unit> ADVANCE <advance size> TIME] SELECT * FROM <source> [SIZE <size> <unit> ADVANCE <advance size> <unit> TIME] SELECT * FROM <source> [SIZE <size> <unit> TIME PARTITION BY bid.auction] SELECT * FROM <source> [SIZE <size> <unit> ADVANCE <advance size> TIME PARTITION BY <partition attribute>] SELECT * FROM <source> [SIZE <size> <unit> ADVANCE <advance size> <unit> TIME PARTITION BY <partition attribute>]
For valid values for <unit> see TimeWindow.
A tuple-based window is defined by the size of the window as number of tuples and an optional advance parameter. The latter defines after how many tuples the window should move. Additional, a partition attribute can be defined. Syntax:
SELECT * FROM <source> [SIZE <size> TUPLE] SELECT * FROM <source> [SIZE <size> ADVANCE <advance size> TUPLE] SELECT * FROM <source> [SIZE <size> TUPLE PARTITION BY bid.auction] SELECT * FROM <source> [SIZE <size> ADVANCE <advance size> TUPLE PARTITION BY <partition attribute>]
Futher information about windows can be found here
Where
... WHERE auction > 10 ...
Group By and Having
... GROUP BY auction HAVING aprice<100.0
Stream To
If you want to stream your results into a sink, you first have to create a sink.
STREAM TO writeout SELECT * FROM nexmark:person WHERE...
This example would push all data that is produced by "SELECT * FROM nexmark:person WHERE..." into the sink named writeout, which is a file-writer in our case (see above).
Examples
Here are some language examples what can be used in the select-part of a CQL-Statement
#PARSER CQL #TRANSCFG Standard #DOREWRITE false #QUERY DROP STREAM bid IF EXISTS #QUERY ATTACH STREAM bid (timestamp STARTTIMESTAMP, auction INTEGER, bidder INTEGER, datetime LONG, price DOUBLE) CHANNEL localhost : 65442 #QUERY DROP STREAM person IF EXISTS #QUERY ATTACH STREAM person (timestamp STARTTIMESTAMP,id INTEGER,name STRING,email STRING,creditcard STRING,city STRING,state STRING) CHANNEL localhost : 65440 /// SIMPLE PROJECTS #QUERY SELECT * FROM bid #QUERY SELECT bid.* FROM bid #QUERY SELECT price FROM bid #QUERY SELECT bidder, price FROM bid #QUERY SELECT timestamp, auction, bidder, datetime, price FROM bid /// PROJECTS WITH RENAMED SOURCE BUT NO USE IN PROJECT #QUERY SELECT * FROM bid AS b #QUERY SELECT bid.* FROM bid AS b #QUERY SELECT price FROM bid AS b #QUERY SELECT bidder, price FROM bid AS b #QUERY SELECT timestamp, auction, bidder, datetime, price FROM bid AS b /// PROJECTS WITH RENAMED SOURCE WITH USE IN PROJECT #QUERY SELECT * FROM bid AS b #QUERY SELECT b.* FROM bid AS b #QUERY SELECT b.price FROM bid AS b #QUERY SELECT b.bidder, b.price FROM bid AS b #QUERY SELECT b.timestamp, b.auction, b.bidder, b.datetime, b.price FROM bid AS b /// PROJECTS WITH RENAMED ATTRIBUTES #QUERY SELECT price AS p FROM bid #QUERY SELECT price AS p, bidder FROM bid #QUERY SELECT price AS p, bidder AS b FROM bid /// PROJECTS WITH RENAMED ATTRIBUTES AND SOURCES #QUERY SELECT price AS p FROM bid AS b #QUERY SELECT b.price AS p FROM bid AS b #QUERY SELECT b.price AS p, b.bidder FROM bid AS b #QUERY SELECT b.price AS p, b.bidder AS b FROM bid AS b /// PROJECTS WITH CONSTANTS, FUNCTIONS AND EXPRESSIONS #QUERY SELECT bidder + price AS d FROM bid #QUERY SELECT 123.4 * price AS d FROM bid #QUERY SELECT 123.4 AS d FROM bid #QUERY SELECT 123.4 AS d, price FROM bid #QUERY SELECT DolToEur(price) AS d FROM bid #QUERY SELECT DolToEur(price) AS d, price FROM bid #QUERY SELECT DolToEur(price) * auction AS d FROM bid #QUERY SELECT DolToEur(price) * price AS d FROM bid #QUERY SELECT DolToEur(price) * auction AS d, price FROM bid #QUERY SELECT DolToEur(123.4) AS d FROM bid #QUERY SELECT 'test' AS s FROM bid #QUERY SELECT 'test' AS s, 123.4 AS d FROM bid #QUERY SELECT 'test' AS s, 123.4 AS d, price FROM bid /// PROJECTS AND SELECTS WITH RENAMED SOURCE WITH USE IN SELECT #QUERY SELECT * FROM bid AS b WHERE b.bidder > 10 #QUERY SELECT b.* FROM bid AS b WHERE b.bidder > 10 #QUERY SELECT b.price FROM bid AS b WHERE b.price < 150.0 #QUERY SELECT b.bidder, b.price FROM bid AS b WHERE b.bidder = 1 #QUERY SELECT b.timestamp, b.auction, b.bidder, b.datetime, b.price FROM bid AS b WHERE b.price > 100.0 /// PROJECTS WITH RENAMED ATTRIBUTES #QUERY SELECT price AS p FROM bid WHERE p < 100 #QUERY SELECT price AS p, bidder FROM bid WHERE p > 100 #QUERY SELECT price AS p, bidder AS b FROM bid WHERE b=1 AND p <100 /// AGGREGATES ARE NOT HANDLED LIKE FUNCTIONS AND MAY HAVE A GROUPING ETC. #QUERY SELECT AVG(price) AS aprice FROM bid #QUERY SELECT AVG(price) AS aprice FROM bid GROUP BY auction #QUERY SELECT auction, AVG(price) AS aprice FROM bid GROUP BY auction #QUERY SELECT auction, AVG(price) AS aprice FROM bid GROUP BY auction HAVING aprice<100.0 /// JOINS AND SELFJOINS #QUERY SELECT auction, bidder FROM bid, person WHERE bid.bidder = person.id #QUERY SELECT auction FROM bid AS b, person AS p WHERE b.bidder = p.id #QUERY SELECT auction AS a, bidder AS b FROM bid, person WHERE a = b #QUERY SELECT auction, bidder FROM bid, person WHERE bid.bidder = person.id #QUERY SELECT left.* FROM bid AS left, bid AS right WHERE left.bidder = right.bidder #QUERY SELECT a.auction AS aid FROM bid AS a, bid AS b WHERE aid=b.auction
And here are examples, based on nexmark, that are more complex
#PARSER CQL #TRANSCFG Standard #DROPALLQUERIES #DROPALLSOURCES #QUERY CREATE STREAM nexmark:person (timestamp STARTTIMESTAMP, id INTEGER, name STRING, email STRING, creditcard STRING, city STRING, state STRING) CHANNEL localhost : 65440 #QUERY CREATE STREAM nexmark:bid (timestamp STARTTIMESTAMP, auction INTEGER, bidder INTEGER, datetime LONG, price DOUBLE) CHANNEL localhost : 65442 #QUERY CREATE STREAM nexmark:auction (timestamp STARTTIMESTAMP, id INTEGER, itemname STRING, description STRING, initialbid INTEGER, reserve INTEGER, expires LONG, seller INTEGER, category INTEGER) CHANNEL localhost : 65441 #QUERY CREATE STREAM nexmark:category (id INTEGER, name STRING, description STRING, parentid INTEGER) CHANNEL localhost : 65443 #PARSER CQL #TRANSCFG Standard #DROPALLQUERIES /// Query 1: Currency Conversion #QNAME Nexmark:Q1 #ADDQUERY SELECT auction, DolToEur(price) AS euro, bidder, datetime FROM nexmark:bid [UNBOUNDED]; ///Query 2: Selection #QNAME Nexmark:Q2 #ADDQUERY SELECT auction, price FROM nexmark:bid WHERE auction=7 OR auction=20 OR auction=21 OR auction=59 OR auction=87; ///Query 3: Local Item Suggestion #QNAME Nexmark:Q3 #ADDQUERY SELECT p.name, p.city, p.state, a.id FROM nexmark:auction [UNBOUNDED] AS a, nexmark:person [UNBOUNDED] AS p WHERE a.seller=p.id AND (p.state='Oregon' OR p.state='Idaho' OR p.state='California') AND a.category = 10; ///Query 4: Average Price for a Category #QNAME Nexmark:Q4 #ADDQUERY SELECT AVG(q.final) FROM nexmark:category [UNBOUNDED] AS c, (SELECT MAX(b.price) AS final, a.category FROM nexmark:auction [UNBOUNDED] AS a, nexmark:bid [UNBOUNDED] AS b WHERE a.id = b.auction AND b.datetime < a.expires AND a.expires < Now() GROUP BY a.id, a.category) AS q WHERE q.category = c.id GROUP BY c.id; ///Query 5: Hot Items #QNAME Nexmark:Q5 #ADDQUERY SELECT b2.auction FROM (SELECT b1.auction, COUNT(auction) AS num FROM nexmark:bid [SIZE 60 MINUTES ADVANCE 1 MINUTE TIME] AS b1 GROUP BY b1.auction ) AS b2 WHERE num >= ALL (SELECT count(auction) AS c FROM nexmark:bid [SIZE 60 MINUTES ADVANCE 1 MINUTE TIME] AS b2 GROUP bY b2.auction) ///Query 6: Average Selling Price by Seller #QNAME Nexmark:Q6 #ADDQUERY SELECT AVG(Q.final) AS s, Q.seller FROM ( SELECT MAX(B.price) AS final, A.seller FROM nexmark:auction [UNBOUNDED] AS A , nexmark:bid [UNBOUNDED] AS B WHERE A.id=B.auction AND B.datetime < A.expires AND A.expires < ${NOW} GROUP BY A.id, A.seller) [SIZE 10 TUPLE PARTITION BY A.seller] AS Q GROUP BY Q.seller; ///Query 7: Monitor New Users #QNAME Nexmark:Q7 #ADDQUERY SELECT p.id, p.name, a.reserve FROM nexmark:person [SIZE 12 HOURS ADVANCE 1 TIME] AS p, nexmark:auction [SIZE 12 HOURS ADVANCE 1 TIME] AS a WHERE p.id = a.seller;