Querying and Streaming from Google BigQuery
The Data Streaming Connector allows you to invoke SQL queries to your Google BigQuery dataset and stream the query results to TigerGraph’s internal Kafka server with a specified topic. You can then create and run a Kafka loading job to load data from Kafka into your graphs.
1. Prerequisites
-
You should have one of the following authentication credentials:
-
Google Service Account credentials
-
Access and refresh tokens
-
2. Procedure
2.1. Specify connector configurations
The connector configurations provide the following information:
-
Connector class
-
Connection URL
-
Value converter
-
SQL query statement
-
Connector properties
2.1.1. Specify connector class
The connector class indicates what type of connector the configuration file is used to create.
Connector class is specified by the connector.class
key.
For streaming from Google BigQuery, the class is io.confluent.connect.jdbc.JdbcSourceConnector
.
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
2.1.2. Specify connection URL
To stream data from BigQuery, Data Streaming Connector uses the BigQuery JDBC Driver.
The connection URL requires the domain name or IP address of the BigQuery server (https://www.googleapis.com/bigquery/v2
), the port number, the ID of your BigQuery project, and your credentials for authentication.
You can also supply other optional properties to the connection that are supported by the BigQuery JDBC connector:
connection.url=jdbc:bigquery://https://www.googleapis.com/bigquery/v2:<port_number>;<property1>=<value1>;<property2>=<value2>;... (1)
1 | Each property-value pair is separated by a semicolon`;`. |
The following is a list of required or frequently used properties:
Property | Description |
---|---|
|
ID of your Google Cloud Project for you BigQuery server. Required. |
|
Required. A number that specifies the type of authentication used by the connector:
|
|
Google Service Account Email used to authenticate the connection.
Required if you set |
|
Path to the key file that is used to authenticate the service account email address.
This parameter supports keys in .p12 or .json format.
Required if you set |
|
Access token of your Google Service Account.
See Google Cloud documentation to generate an access token.
Required if you set |
|
Refresh token of your Google Service Account.
See Google Cloud documentation to generate a refresh token.
Required if you set |
2.1.3. Value converter
BigQuery stores data in a proprietary columnar format. In order for TigerGraph to ingest the data, Data Streaming Connector uses a value converter to convert the data into the CSV format. To do this, add a line in the configuration file that specifies the value converter:
value.converter=com.tigergraph.kafka.connect.converters.TigerGraphCsvConverter
By default, CSV values are separated by commas.
If you want to specify the separator, use the value.converter.csv.separator
configuration to specify the separator.
For example, the following converter configurations use |
as the separator.
If you change the separator, make sure to use the same separator when you create the loading job.
value.converter=com.tigergraph.kafka.connect.converters.TigerGraphCsvConverter
value.converter.csv.separator=|
2.1.4. Subsection configurations
Subsection configurations are specified after the overall connector configurations. Each set of subsection configurations creates an instance of a connector by specifying a connector name and a Kafka topic that the connector streams messages to.
When streaming from BigQuery, each subsection also must have a SQL query statement used to query your BigQuery warehouse. The results of the query are streamed to the corresponding topic.
If you are querying STRUCT data or arrays, see Querying STRUCT data and arrays.
BYTES type values are automatically converted to UTF-8 strings.
|
For example, the following subsection configurations create two instances of the connector, each streaming the results of their corresponding queries to topic_0
and topic_1
:
[bq_query_0]
name="bq_query_0"
topic="topic_0"
query="SELECT * FROM `bigquery-public-data.breathe.arxiv` LIMIT 1000"
[bq_query_1]
name="bq_query_1"
topic="topic_1"
query="SELECT doi,abstract,authors FROM `bigquery-public-data.breathe.nature`"
include::partial$create-connector.adoc[
2.2. Create data source
Now that the connector has started loading data into TigerGraph’s internal Kafka cluster, you can create a data source and point it to the Kafka cluster:
-
Create a data source configuration file. The only required field is the address of the broker. The broker’s IP and hostname is the private IP of your TigerGraph server and port of TigerGraph’s internal Kafka server:
-
If you don’t know your private IP, you can retrieve it by running
gmyip
in the bash terminal. -
To retrieve the port of your Kafka cluster, run
gadmin config get Kafka.Port
to retrieve the port number. The default port is30002
.
In the
kafka.config
field, you can provide additional configurations from librdkafka Global Configurations. For example, you can specifygroup.id
to betigergraph
to specify the group that this consumer belongs to:{ "broker":"10.128.0.240:30002", (1) "kafka_config": { "session.timeout.ms":"20000" } }
1 Make sure to use the internal ID of the machine instead of localhost
for the hostname of the broker.localhost
only works for single-server instances. -
-
Run
CREATE DATA SOURCE
to create the data source and assign the configuration file to the data source you just created:CREATE DATA_SOURCE KAFKA <datasource_name> FOR GRAPH <graph_name> SET <datasource_name> = <path_to_datasource_config>
For example:
CREATE DATA_SOURCE KAFKA k1 FOR GRAPH Social SET k1 = "/home/tigergraph/social/k1.config"
2.3. Create loading job
Create a loading job to load data from the data source:
-
Create a topic-partition configuration for each topic.
{ "topic": <topic_name>, (1) "partition_list": [ (2) { "start_offset": <offset_value>, (3) "partition": <partition_number> (4) }, { "start_offset": <offset_value>, (3) "partition": <partition_number> (4) } ... ] }
1 Replace <topic_name>
with the name of the topic this configuration applies to. This must be one of the topics you configured in the connector configuration step.2 List of partitions you want to stream from. For each partition, you can set a start offset. If this list is empty, or partition_list
isn’t included, all partitions are used with the default offset.3 Replace <offset_value>
with the offset value you want. The default offset for loading is-1
, which means you will load data from the most recent message in the topic. If you want to load from the beginning of a topic, thestart_offset
value should be-2
.4 Replace <partition_number>
with the partition number if you want to configure. -
Create a loading job and map data to graph. See Kafka loader guide for how to map data from a Kafka data source to the graph. See Loading JSON data on how to create a loading job for JSON data.
Known bug: to use the -1 value for offset, delete the start_offset key instead of setting it to -1 .
|
2.4. Run loading job
Run the loading job created in the last step will load the streamed data into the graph.
If you make changes to the topic-partition configuration file, you can overwrite the values for the filename variables with the USING
clause.
GSQL > RUN LOADING JOB load_person
By default, loading jobs that use Kafka data sources run in streaming mode and do not stop until manually aborted. As data is streamed from the data source, the running loading job will continuously ingest the streamed data into the graph store.
3. Querying STRUCT
data and arrays
If the record being queried in your BigQuery warehouse contains STRUCT
data or arrays, conversion functions need to be applied to the SQL statement:
3.1. STRUCT
data
When retrieving struct data, it is recommended to retrieve the fields of the data directly. For example:
SELECT basic.age, basic.gender FROM `project.dataset.table`
If you want to retrieve the entire STRUCT
, you need to first use TO_JSON_STRING()
to convert the STRUCT
data into JSON strings in the converted CSV streams.
For example, SELECT TO_JSON_STRING(col) FROM table
.
After converting the data into string format, flatten the JSON strings when loading the data into the graph.
3.2. Arrays
To load array values, apply the function ARRAY_TO_STRING
to the columns of ARRAY
type.
For example, SELECT ARRAY_TO_STRING(col_arr,separator) FROM table
.
It is important to ensure that the separator used here is distinct from the separator in your CSV streams.
After converting the array to strings, the string representation of the arrays will be in the CSV streams. You can then load the data in CSV as a list.
4. Example
For example, suppose we have the following source table in BigQuery and the following graph schema in TigerGraph:
name: String | basic: Struct | tags: Array<String> | state: String |
---|---|---|---|
Tom |
{“age“:40, ”gender”:”male”} |
[“tall“,”strong”] |
ca |
Dan |
{“age“:35, ”gender”:”female”} |
[“smart“,”blonde”] |
ny |
CREATE VERTEX person (PRIMARY_ID name STRING, name STRING, age INT, gender STRING, state STRING, tags LIST<STRING>)
CREATE GRAPH social (person)
The following configuration invokes the provided query on the source table and convert the result to CSV format:
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.url="jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;OAuthType=2;ProjectId=tigergraph;OAuthAccessToken=xxx;OAuthRefreshToken=yyy;OAuthClientId=zzz;OAuthClientSecret=sss;LargeResultDataset=target_dataset;LargeResultTable=target_table;"
mode=bulk
value.converter=com.tigergraph.kafka.connect.converters.TigerGraphCsvConverter
[bq_query_0]
name="bq_query_0"
topic="person-demo"
query="SELECT name, basic.age, basic.gender, state, ARRAY_TO_STRING(tags,'#') FROM `project.dataset.table`"
The following is the converted CSV stream:
Tom,40,male,ca,tall#strong
Dan,35,female,ny,smart#blonde
The next step is to create the data source. The following data source configuration file can be used to create the data source:
{
"broker":"10.128.0.240:30002", (1)
}
1 | The IP address is the internal network IP address of the server running TigerGraph. |
Run the following command to create the data source k1
:
CREATE DATA_SOURCE KAFKA k1 FOR GRAPH social
The next step is to create the topic-partition configurations to use in the loading job.
In this case, the data source is the person-demo
topic in TigerGraph’s internal Kafka cluster.
{
"topic": "person-demo",
"partition_list": [
{
"start_offset": -2,
"partition": 0
}
]
}
CREATE LOADING JOB load_person FOR GRAPH social {
DEFINE FILENAME f1 = "$k1:/home/mydata/topic_person.json";
LOAD f1 TO VERTEX person VALUES ($0, $0, $1, $2, $3,SPLIT($4,"#")) USING separator="|";
}
After the loading job has been defined, run the loading job to ingest the data into the graph.