Write a CSV text file from Spark
Write a csv file from Spark ,
Problem : How to write csv file using spark . (Github)
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.6.0-cdh5.9.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.0-cdh5.9.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.big.data</groupId>
<artifactId>avro-schema</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-avro_2.10</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.5.0-cdh5.9.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.10</artifactId>
<version>1.5.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.5.0-cdh5.9.0</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
[addToAppearHere]
Spark :
package com.big.data.spark; import com.big.data.avro.schema.Employee; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; import static org.apache.spark.sql.types.DataTypes.IntegerType; import static org.apache.spark.sql.types.DataTypes.StringType; public class RDDtoCSV extends Configured implements Tool, Closeable { public static final String INPUT_PATH = "spark.input.path"; public static final String OUTPUT_PATH = "spark.output.path"; public static final String IS_RUN_LOCALLY = "spark.is.run.local"; public static final String DEFAULT_FS = "spark.default.fs"; public static final String NUM_PARTITIONS = "spark.num.partitions"; public static final String NEW_LINE_DELIMETER = "\n"; private static final Logger LOG = LoggerFactory.getLogger(DataFrameJoin.class); private SQLContext sqlContext; private JavaSparkContext javaSparkContext; protected <T> JavaSparkContext getJavaSparkContext(final boolean isRunLocal, final String defaultFs, final Class<T> tClass) { final SparkConf sparkConf = new SparkConf() // Set spark conf here , // After one gets spark context you can set hadoop configuration for InputFormats .setAppName(tClass.getSimpleName()) .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); if (isRunLocal) { sparkConf.setMaster("local[*]"); } final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); if (defaultFs != null) { sparkContext.hadoopConfiguration().set("fs.defaultFS", defaultFs); } return sparkContext; } @Override public int run(String[] args) throws Exception { //The arguments passed has been split into Key value by ToolRunner Configuration conf = getConf(); String inputPath = conf.get(INPUT_PATH); String outputPath = conf.get(OUTPUT_PATH); //Get spark context, This is the central context , which can be wrapped in Any Other context javaSparkContext = getJavaSparkContext(conf.getBoolean(IS_RUN_LOCALLY, Boolean.FALSE), conf.get(DEFAULT_FS), this.getClass()); sqlContext = new SQLContext(javaSparkContext); // No input path has been read, no job has not been started yet . //To set any configuration use javaSparkContext.hadoopConfiguration().set(Key,value); // To set any custom inputformat use javaSparkContext.newAPIHadoopFile() and get a RDD // Schema for the CSV // Various supported Type protected lazy val primitiveType: Parser[DataType] = // ( "StringType" ^^^ StringType // | "FloatType" ^^^ FloatType // | "IntegerType" ^^^ IntegerType // | "ByteType" ^^^ ByteType // | "ShortType" ^^^ ShortType // | "DoubleType" ^^^ DoubleType // | "LongType" ^^^ LongType // | "BinaryType" ^^^ BinaryType // | "BooleanType" ^^^ BooleanType // | "DateType" ^^^ DateType // | "DecimalType()" ^^^ DecimalType.USER_DEFAULT // | fixedDecimalType // | "TimestampType" ^^^ TimestampType // ) final StructType outPutSchemaStructType = new StructType(new StructField[]{ new StructField("emp_id", IntegerType, false, Metadata.empty()), new StructField("emp_name", StringType, false, Metadata.empty()), }); // read data from parquetfile, the schema of the data is taken from the avro schema DataFrame inputDf = sqlContext.read() .format(Employee.class.getCanonicalName()) .parquet(inputPath); // Convert DataFrame into JavaRDD // The rows read from the parquetfile is converted into a Row object . // Row has same schema as that of the parquet file roe JavaRDD<Row> rowJavaRDD = inputDf.select("emp_id", "emp_name").javaRDD(); DataFrame outputDf = sqlContext.createDataFrame(rowJavaRDD, outPutSchemaStructType); // show() is a action so donot have it enabled unecessary if (LOG.isDebugEnabled()) { LOG.info("Schema and Data from parquet file is "); outputDf.printSchema(); outputDf.show(); } // Convert JavaRDD to CSV and save as text file outputDf.write() .format("com.databricks.spark.csv") // Header => true, will enable to have header in each file .option("header", "true") .save(outputPath); return 0; } @Override public void close() throws IOException { IOUtils.closeQuietly(javaSparkContext); } public static void main(String[] args) throws Exception { ToolRunner.run(new RDDtoCSV(), args); } }
[addToAppearHere]
Integration Test : (Github)
package com.big.data.spark; import com.big.data.avro.schema.Employee; import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.Path; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import parquet.avro.AvroParquetWriter; import parquet.hadoop.ParquetWriter; import parquet.hadoop.metadata.CompressionCodecName; import java.io.File; import java.io.IOException; public class RDDtoCSVTest { private static final Logger LOG = LoggerFactory.getLogger(ReadWriteAvroParquetFilesTest.class); private static final String BASEDIR = "/tmp/RDDtoCSVTest/avroparquetInputFile/" + System.currentTimeMillis() + "/"; private String input; private String output; private Employee employee; @Before public void setUp() throws IOException { input = BASEDIR + "input/"; output = BASEDIR + "output/"; employee = new Employee(); employee.setEmpId(1); employee.setEmpName("Maverick"); employee.setEmpCountry("DE"); //Write parquet file with GZIP compression ParquetWriter<Object> writer = AvroParquetWriter .builder(new Path(input + "1.gz.parquet")) .withCompressionCodec(CompressionCodecName.GZIP) .withSchema(Employee.getClassSchema()) .build(); writer.write(employee); writer.close(); } @Test public void testSuccess() throws Exception { String[] args = new String[]{"-D" + RDDtoCSV.INPUT_PATH + "=" + input, "-D" + RDDtoCSV.OUTPUT_PATH + "=" + output, "-D" + RDDtoCSV.IS_RUN_LOCALLY + "=true", "-D" + RDDtoCSV.DEFAULT_FS + "=file:///", "-D" + RDDtoCSV.NUM_PARTITIONS + "=1"}; RDDtoCSV.main(args); File outputFile = new File(output + "part-00000"); String fileToString = FileUtils.readFileToString(outputFile, "UTF-8"); System.out.println(fileToString); //header + 1 line Assert.assertEquals(2, fileToString.split(RDDtoCSV.NEW_LINE_DELIMETER).length); } @After public void cleanup() throws IOException { FileUtils.deleteDirectory(new File(BASEDIR)); } }