Stream Data from AWS S3
You can create a data connector between TigerGraph’s internal Kafka server and your AWS S3 bucket with a specified topic. The connector streams data from the data source in your S3 buckets to TigerGraph’s internal Kafka cluster. You can then create and run a loading job to load data from Kafka into the graph store using the Kafka loader.
1. Prerequisites
-
You have an access key corresponding to an AWS IAM user who has access to the S3 bucket you are loading data from.
2. Procedure
2.1. Specify connector configurations
The connector configurations provide the following information:
-
Connector class
-
Your AWS account credentials
-
Information on how to parse the source data
-
Mapping between connector and source file
2.1.1. Connector class
connector.class=com.tigergraph.kafka.connect.filesystem.aws.S3SourceConnector
The connector class indicates what type of connector the configuration file is used to create.
Connector class is specified by the connector.class
key.
For connecting to AWS S3, its value is
com.tigergraph.kafka.connect.filesystem.aws.S3SourceConnector
.
2.1.2. Provide AWS credentials
The connector can use two methods for authentication.
-
The standard simple credentials provider and your access key.
-
AWS Identity and Access Management (IAM) roles.
The configuration fields are as follows:
-
file.reader.settings.fs.s3a.aws.credentials.provider
: Comma-separated class names of credential provider classes which implementcom.amazonaws.auth.AWSCredentialsProvider
.-
This field should be set to
org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
. -
If you want to load data from S3 via an IAM role, set this to
com.amazonaws.auth.InstanceProfileCredentialsProvider
and leave the access key and secret blank.
-
-
file.reader.settings.fs.s3a.access.key
: AWS access key ID. -
file.reader.settings.fs.s3a.secret.key
: AWS access key secret.
2.1.3. Example credential configurations
file.reader.settings.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
file.reader.settings.fs.s3a.access.key=A2V************J
file.reader.settings.fs.s3a.secret.key=wEV************************************8
file.reader.settings.fs.s3a.aws.credentials.provider=com.amazonaws.auth.InstanceProfileCredentialsProvider
file.reader.settings.fs.s3a.access.key=
file.reader.settings.fs.s3a.secret.key=
2.1.4. Other configurations
The connector uses Hadoop S3A to connect to S3 buckets. The configurations below are required along with our recommended values:
file.reader.settings.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem file.reader.settings.fs.AbstractFileSystem.s3a.impl=org.apache.hadoop.fs.s3a.S3A file.reader.settings.fs.s3a.threads.max=1000 file.reader.settings.fs.s3a.connection.maximum=1000
When you want to load data from a private URL, you need an additional two configurations:
file.reader.settings.fs.s3a.endpoint= <S3 endpoint to connect to> file.reader.settings.fs.s3a.endpoint.region= <region where bucket is located>
An updated list of regions and endpoints is located at AWS Service Endpoints.
The other available configuration items can be found at Apache Hadoop Amazon Web Services support – Hadoop-AWS module: Integration with Amazon Web Services.
You can adopt them in your connector config files with prefix file.reader.settings
.
2.1.5. Specify parsing rules
The streaming connector supports the following file types:
-
CSV files
-
JSON files. Each JSON object must be on a separate line.
-
directories
-
tar files
-
zip files
For URIs that point to directories and compressed files, the connector looks for all files inside the directory or the compressed file that match the file.regexp
parameter.
By default, this includes both CSV and JSON files.
If you set file.recursive
to true, the connector looks for files recursively.
The following parsing options are available:
Name | Description | Default |
---|---|---|
|
The regular expression to filter which files to read. The default value matches all files. |
|
|
Whether to retrieve files recursively if the URI points to a directory. |
|
|
The type of file reader to use.
The only supported value is |
|
|
The character that separates columns. This parameter does not affect JSON files. |
|
|
The explicit boundary markers for string tokens, either single or double quotation marks. This parameter does not affect JSON files. The parser will not treat separator characters found within a pair of quotation marks as a separator. Accepted values:
|
Empty string |
|
The default value for a column when its value is null. This parameter does not affect JSON files. |
Empty string |
|
The maximum number of lines to include in a single batch. |
|
|
End of line character. |
|
|
Whether the first line of the files is a header line. If the value is set to true, the first line of the file is ignored during data loading. If you are using JSON files, set this parameter to false. |
|
|
File type for archive files.
Setting the value of this configuration to
|
|
|
If a file has this extension, treat it as a tar file |
|
|
If a file has this extension, treat it as a zip file |
|
|
If a file has this extension, treat it as a gzip file |
|
|
If a file has this extension, treat it as a |
|
2.1.6. Map source file to connector
The below configurations are required:
Name | Description | Default |
---|---|---|
|
Name of the connector. |
None. Must be provided by the user. |
|
Name of the topic to create in Kafka. |
None. Must be provided by the user. |
|
The maximum number of tasks which can run in parallel. |
1 |
|
Number of partitions in the topic used by connector. This only affects newly created topics and is ignored if a topic already exists. |
1 |
|
The path(s) to the data files on Google Cloud Storage. The URI may point to a CSV file, a zip file, a gzip file, or a directory |
None. Must be provided by the user. |
2.1.7. Example
The following is an example configuration file:
connector.class=com.tigergraph.kafka.connect.filesystem.aws.S3SourceConnector file.reader.settings.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider file.reader.settings.fs.s3a.access.key=A2V************J file.reader.settings.fs.s3a.secret.key=wEV************************************8 file.reader.settings.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem file.reader.settings.fs.AbstractFileSystem.s3a.impl=org.apache.hadoop.fs.s3a.S3A file.reader.settings.fs.s3a.threads.max=1000 file.reader.settings.fs.s3a.connection.maximum=1000 mode=eof file.regexp=".*" file.recursive=true file.reader.type=text file.reader.batch.size=10000 file.reader.text.eol="\\n" file.reader.text.header=true file.reader.text.archive.type=auto file.reader.text.archive.extensions.tar=tar file.reader.text.archive.extensions.zip=zip file.reader.text.archive.extensions.gzip=gz file.reader.text.archive.extensions.tar.gz=tar.gz,tgz [s3_connector_person_csv] name=s3-person-csv file.reader.batch.size=100 tasks.max=10 topic=s3-person-csv-topic file.uris=s3a://my-bucket/p.csv [s3_connector_friend_csv] name=s3-friend-csv tasks.max=10 topic=s3-friend-csv-topic file.uris=s3a://my-bucket/f.csv [s3_connector_person_tar] name=s3-person-tar tasks.max=10 topic=s3-person-tar-topic file.uris=s3a://my-bucket/p.tar [s3_connector_friend_tar] name=s3-friend-tar tasks.max=10 topic=s3-friend-tar-topic file.uris=s3a://my-bucket/f.tar [s3_connector_person_tgz] name=s3-person-tgz tasks.max=10 topic=s3-person-tgz-topic file.uris=s3a://my-bucket/p.tgz [s3_connector_friend_tgz] name=s3-friend-tgz tasks.max=10 topic=s3-friend-tgz-topic file.uris=s3a://my-bucket/f.tgz [s3_connector_person_zip] name=s3-person-zip tasks.max=10 topic=s3-person-zip-topic file.uris=s3a://my-bucket/p.zip [s3_connector_friend_zip] name=s3-friend-zip tasks.max=10 topic=s3-friend-zip-topic file.uris=s3a://my-bucket/f.zip
2.2. Create connector
Once you have prepared the configuration file, run command gadmin connector create
and specify the configuration file to create the connector:
gadmin connector create --c <config_file>
The connectors start streaming from the data source immediately after creation if the configurations are valid.
You can run gadmin connector status
to verify the status of the connectors.
If the configurations are valid, the connectors should be in RUNNING
status.
Data streamed from the source stays in the destination cluster Kafka topics.
If the source data are in Avro or Parquet formats, the connector converts the data into JSON format when it loads the data into the destination Kafka cluster (often TigerGraph’s internal Kafka cluster). |
If the destination cluster is TigerGraph’s internal Kafka cluster, the messages are removed as they are loaded in to the graph during the course of the loading job at regular intervals.
Automatic removal of loaded Kafka messages is an alpha feature and may be subject to change. |
2.3. Create data source
Now that the connector has started loading data into TigerGraph’s internal Kafka cluster, you can create a data source and point it to the Kafka cluster:
-
Create a data source configuration file. The only required field is the address of the broker. The broker’s IP and hostname is the private IP of your TigerGraph server and port of TigerGraph’s internal Kafka server:
-
If you don’t know your private IP, you can retrieve it by running
gmyip
in the bash terminal. -
To retrieve the port of your Kafka cluster, run
gadmin config get Kafka.Port
to retrieve the port number. The default port is30002
.
In the
kafka.config
field, you can provide additional configurations from librdkafka Global Configurations. For example, you can specifygroup.id
to betigergraph
to specify the group that this consumer belongs to:{ "broker":"10.128.0.240:30002", (1) "kafka_config": { "session.timeout.ms":"20000" } }
1 Make sure to use the internal ID of the machine instead of localhost
for the hostname of the broker.localhost
only works for single-server instances. -
-
Run
CREATE DATA SOURCE
to create the data source and assign the configuration file to the data source you just created:CREATE DATA_SOURCE KAFKA <datasource_name> FOR GRAPH <graph_name> SET <datasource_name> = <path_to_datasource_config>
For example:
CREATE DATA_SOURCE KAFKA k1 FOR GRAPH Social SET k1 = "/home/tigergraph/social/k1.config"
2.4. Create loading job
Create a loading job to load data from the data source:
-
Create a topic-partition configuration for each topic.
{ "topic": <topic_name>, (1) "partition_list": [ (2) { "start_offset": <offset_value>, (3) "partition": <partition_number> (4) }, { "start_offset": <offset_value>, (3) "partition": <partition_number> (4) } ... ] }
1 Replace <topic_name>
with the name of the topic this configuration applies to. This must be one of the topics you configured in the connector configuration step.2 List of partitions you want to stream from. For each partition, you can set a start offset. If this list is empty, or partition_list
isn’t included, all partitions are used with the default offset.3 Replace <offset_value>
with the offset value you want. The default offset for loading is-1
, which means you will load data from the most recent message in the topic. If you want to load from the beginning of a topic, thestart_offset
value should be-2
.4 Replace <partition_number>
with the partition number if you want to configure. -
Create a loading job and map data to graph. See Kafka loader guide for how to map data from a Kafka data source to the graph. See Loading JSON data on how to create a loading job for JSON data.
Known bug: to use the -1 value for offset, delete the start_offset key instead of setting it to -1 .
|
For example, suppose we have the following two CSV files and schema:
CREATE VERTEX Person (PRIMARY_ID name STRING, name STRING, age INT, gender STRING, state STRING)
CREATE UNDIRECTED EDGE Friendship (FROM Person, TO Person, connect_day DATETIME)
CREATE GRAPH Social (Person, Friendship)
name,gender,age,state
Tom,male,40,ca
Dan,male,34,ny
Jenny,female,25,tx
Kevin,male,28,az
Amily,female,22,ca
Nancy,female,20,ky
Jack,male,26,fl
A,male,29,ca
person1,person2,date
Tom,Dan,2017-06-03
Tom,Jenny,2015-01-01
Dan,Jenny,2016-08-03
Jenny,Amily,2015-06-08
Dan,Nancy,2016-01-03
Nancy,Jack,2017-03-02
Dan,Kevin,2015-12-30
Amily,Dan,1990-1-1
The following topic-partition configurations and loading job will load the two CSV files into the graph:
{
"topic": "person-demo-104",
"partition_list": [
{
"start_offset": -2,
"partition": 0
}
]
}
{
"topic": "friend-demo-104",
"partition_list": [
{
"start_offset": -2,
"partition": 0
}
]
}
CREATE LOADING JOB load_person FOR GRAPH Social {
DEFINE FILENAME f1 = "$k1:/home/mydata/topic_person.json"; (1)
DEFINE FILENAME f2 = "$k1:/home/mydata/topic_friend.json";
LOAD f1 TO VERTEX person VALUES ($0, $0, $2, $1, $3) USING separator=",";
LOAD f2 TO EDGE friendship VALUES ($0, $1, $2) USING separator=",";
}
1 | $k1 refers to a data source defined in the graph.
See Create data source |
2.5. Run loading job
Run the loading job created in the last step will load the streamed data into the graph.
If you make changes to the topic-partition configuration file, you can overwrite the values for the filename variables with the USING
clause.
GSQL > RUN LOADING JOB load_person
By default, loading jobs that use Kafka data sources run in streaming mode and do not stop until manually aborted. As data is streamed from the data source, the running loading job will continuously ingest the streamed data into the graph store.