From cd271c3af13586c281374ebd7a3db94bb264265b Mon Sep 17 00:00:00 2001 From: Michael Haeusler Date: Mon, 24 Jan 2011 13:11:09 +0100 Subject: [PATCH] SQOOP-140 (Sqoop throws OutOfMemory exceptions / Java heap space errors when importing large tables from PostgreSQL): - include new PostgreSQL-specific DBRecordReaders which call java.sql.PreparedStatement.setFetchSize() - adapted the DBInputFormats to detect PostgreSQL and use specific DBRecordReaders accordingly --- .../cloudera/sqoop/mapreduce/db/DBInputFormat.java | 5 ++ .../mapreduce/db/DataDrivenDBInputFormat.java | 5 ++ .../mapreduce/db/PostgreSQLDBRecordReader.java | 54 +++++++++++++++++++ .../db/PostgreSQLDataDrivenDBRecordReader.java | 55 ++++++++++++++++++++ 4 files changed, 119 insertions(+), 0 deletions(-) create mode 100644 src/java/com/cloudera/sqoop/mapreduce/db/PostgreSQLDBRecordReader.java create mode 100644 src/java/com/cloudera/sqoop/mapreduce/db/PostgreSQLDataDrivenDBRecordReader.java diff --git a/src/java/com/cloudera/sqoop/mapreduce/db/DBInputFormat.java b/src/java/com/cloudera/sqoop/mapreduce/db/DBInputFormat.java index 523fd9a..f5c18b3 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/db/DBInputFormat.java +++ b/src/java/com/cloudera/sqoop/mapreduce/db/DBInputFormat.java @@ -213,6 +213,11 @@ public class DBInputFormat return new MySQLDBRecordReader(split, inputClass, conf, getConnection(), getDBConf(), conditions, fieldNames, tableName); + } else if (dbProductName.startsWith("POSTGRESQL")) { + // use PostgreSQL-specific db reader. + return new PostgreSQLDBRecordReader(split, inputClass, + conf, getConnection(), getDBConf(), conditions, fieldNames, + tableName); } else { // Generic reader. return new DBRecordReader(split, inputClass, diff --git a/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBInputFormat.java b/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBInputFormat.java index e482edb..238c945 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBInputFormat.java +++ b/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBInputFormat.java @@ -294,6 +294,11 @@ public class DataDrivenDBInputFormat return new MySQLDataDrivenDBRecordReader(split, inputClass, conf, getConnection(), dbConf, dbConf.getInputConditions(), dbConf.getInputFieldNames(), dbConf.getInputTableName()); + } else if (dbProductName.startsWith("POSTGRESQL")) { + // use PostgreSQL-specific db reader. + return new PostgreSQLDataDrivenDBRecordReader(split, inputClass, + conf, getConnection(), dbConf, dbConf.getInputConditions(), + dbConf.getInputFieldNames(), dbConf.getInputTableName()); } else { // Generic reader. return new DataDrivenDBRecordReader(split, inputClass, diff --git a/src/java/com/cloudera/sqoop/mapreduce/db/PostgreSQLDBRecordReader.java b/src/java/com/cloudera/sqoop/mapreduce/db/PostgreSQLDBRecordReader.java new file mode 100644 index 0000000..e15d9bc --- /dev/null +++ b/src/java/com/cloudera/sqoop/mapreduce/db/PostgreSQLDBRecordReader.java @@ -0,0 +1,54 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.mapreduce.db; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.lib.db.DBWritable; + +/** + * A RecordReader that reads records from a PostgreSQL table. + */ +public class PostgreSQLDBRecordReader + extends DBRecordReader { + + // Fetch 50 rows at a time. + private static final int POSTGRESQL_FETCH_SIZE = 1000; + + // CHECKSTYLE:OFF + // TODO(aaron) - refactor DBRecordReader c'tor to use fewer arguments. + public PostgreSQLDBRecordReader(DBInputFormat.DBInputSplit split, + Class inputClass, Configuration conf, Connection conn, + DBConfiguration dbConfig, String cond, String [] fields, String table) + throws SQLException { + super(split, inputClass, conf, conn, dbConfig, cond, fields, table); + } + // CHECKSTYLE:ON + + // Execute statements for PostgreSQL in unbuffered mode. + protected ResultSet executeQuery(String query) throws SQLException { + statement = getConnection().prepareStatement(query, + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + statement.setFetchSize(POSTGRESQL_FETCH_SIZE); // PostgreSQL: read a limited batch at a time. + return statement.executeQuery(); + } +} diff --git a/src/java/com/cloudera/sqoop/mapreduce/db/PostgreSQLDataDrivenDBRecordReader.java b/src/java/com/cloudera/sqoop/mapreduce/db/PostgreSQLDataDrivenDBRecordReader.java new file mode 100644 index 0000000..1ea4a65 --- /dev/null +++ b/src/java/com/cloudera/sqoop/mapreduce/db/PostgreSQLDataDrivenDBRecordReader.java @@ -0,0 +1,55 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.mapreduce.db; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.lib.db.DBWritable; + +/** + * A RecordReader that reads records from a PostgreSQL table via + * DataDrivenDBRecordReader. + */ +public class PostgreSQLDataDrivenDBRecordReader + extends DataDrivenDBRecordReader { + + // Fetch 50 rows at a time. + private static final int POSTGRESQL_FETCH_SIZE = 1000; + + //CHECKSTYLE:OFF + public PostgreSQLDataDrivenDBRecordReader(DBInputFormat.DBInputSplit split, + Class inputClass, Configuration conf, Connection conn, + DBConfiguration dbConfig, String cond, String [] fields, String table) + throws SQLException { + super(split, inputClass, conf, conn, dbConfig, cond, fields, table, + "POSTGRESQL"); + } + //CHECKSTYLE:ON + + // Execute statements for PostgreSQL in unbuffered mode. + protected ResultSet executeQuery(String query) throws SQLException { + statement = getConnection().prepareStatement(query, + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + statement.setFetchSize(POSTGRESQL_FETCH_SIZE); // PostgreSQL: read a limited batch at a time. + return statement.executeQuery(); + } +} -- 1.7.3.5