2013年7月29日星期一

MR on HBase - java.io.IOException: Pass a Delete or a Put

package Aepri.InfoDev;
import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.client.HTable;

import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.io.ArrayWritable;


import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.io. *;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io. *;

public class HbaseMapReduce {

private static Configuration conf = null;
static {
Configuration HBASE_CONFIG = new Configuration ();
HBASE_CONFIG.set ("mapred.job.tracker", "192.168.1.155:9001");
HBASE_CONFIG.set ("hbase.master", "192.168.1.155:60000");
/ / with hbase / conf / hbase-site.xml configuration of the same value in hbase.zookeeper.quorum
HBASE_CONFIG.set ("hbase.zookeeper.quorum", "192.168.1.155");
/ / with hbase / conf / hbase-site.xml configuration of the same value in hbase.zookeeper.property.clientPort
HBASE_CONFIG.set ("hbase.zookeeper.property.clientPort", "2181");
conf = HBaseConfiguration.create (HBASE_CONFIG);
}

public static class MyMapper extends TableMapper <ImmutableBytesWritable,FloatWritable> {
/ / default input key value type is , the above is the type of output key value
/ / keyout, valueout
public void map (ImmutableBytesWritable row, Result values, Context context) throws InterruptedException, IOException {
/ / key value context
FloatWritable value = null;
ImmutableBytesWritable key = null;

for (KeyValue kv: values.raw ()) {
/ / System.out.print (new String (kv.getRow ()) + "");
/ / System.out.print (new String (kv.getFamily ( )) + ":");
/ / System.out.print (new String (kv.getQualifier ( )) + "");
/ / System.out.print (kv.getTimestamp () + "");
/ / System.out.println (new String (kv.getValue ( )));

/ / key = Bytes.toString (kv.getRow ());
key = new ImmutableBytesWritable (kv.getRow ());
String tempValue = new String (kv.getValue ());
context.write (key, new FloatWritable (Float.parseFloat (tempValue) ));
}
/ / context.write (key,

}
}
public static class MyReducer extends TableReducer <ImmutableBytesWritable,FloatWritable, ImmutableBytesWritable> {
/ / keyin.valuein, keyout
public void Reduce (ImmutableBytesWritable key, Iterable values, Context context) throws IOException, InterruptedException {
/ / Iterable values ??in the Iterable values ??switch test
float sum = 0;
int count = 0;
float avg = 0;

for (FloatWritable val: values) {
sum + = val.get ();
count + +;
}
if (count == 0) {
count = 1;
}
avg = sum / count;

Put put = new Put (key.get ());
put.add (Bytes.toBytes ("grade"), Bytes.toBytes ("ss"), Bytes.toBytes (avg));

context.write (key, put);
}
}
/ * public static class MyReducer extends TableReducer <ImmutableBytesWritable,FloatWritable, ImmutableBytesWritable> {
/ / keyin.valuein, keyout
public void Reduce (ImmutableBytesWritable key, Iterable values, Context context) throws IOException, InterruptedException {
/ / Iterable values ??in the Iterable values ??switch test
float sum = 0;
int count = 0;
float avg = 0;

for (FloatWritable val: values) {
sum + = val.get ();
count + +;
}
if (count == 0) {
count = 1;
}
avg = sum / count;

Put put = new Put (key.get ());
put.add (Bytes.toBytes ("grade"), Bytes.toBytes ("ss"), Bytes.toBytes (avg));

context.write (key, put);
}
} * /

/ **
* @ param args
* /
public static void main (String [] args) throws Exception {
/ / TODO Auto-generated method stub
String sourcetablename = "tt3";
String destablename = "tt5";
Configuration conf2 = HBaseConfiguration.create ();
conf2.set ("mapred.job.tracker", "192.168.1.155:9001");

/ / Job job = new Job (conf2, "ExampleRead");
Job job = new Job (conf, "ExampleRead");
job.setJarByClass (HbaseMapReduce.class);
job.setMapOutputKeyClass (ImmutableBytesWritable.class);
job.setMapOutputValueClass (FloatWritable.class);
HTable table = new HTable (conf, sourcetablename);
Scan scan = new Scan ();
scan.setCaching (500); / / 1 is the default in Scan, which will be ; bad for MapReduce jobs
scan.setCacheBlocks (false);
scan.addFamily (Bytes.toBytes ("course"));

/ * ResultScanner rs = null;
rs = table.getScanner (scan);
for (Result r: rs) {
for (KeyValue kv: r.raw ()) {
System.out.print (new String (kv.getRow ( )) + "");
System.out.print (new String (kv.getFamily ( )) + ":");
System.out.print (new String (kv.getQualifier ( )) + "");
System.out.print (kv.getTimestamp () + "");
System.out.println (new String (kv.getValue ( )));
}
} * /

TableMapReduceUtil.initTableMapperJob (sourcetablename, scan, MyMapper.class, ImmutableBytesWritable.class, FloatWritable.class, job);
TableMapReduceUtil.initTableReducerJob (destablename, MyReducer.class, job);
job.setNumReduceTasks (1);
boolean b = job.waitForCompletion (true);


}

}

is seeking tt3 average of a column, and then put the results in tt4.
tt3 data:
ianguangchao course: 11
qianguangchao course: chn 71.8
qianguangchao course: math 71.8
qianguangchao course: sixiang 71.8
qianguangchao11 course: 12
In tt4 should be:
qianguangchao grade: (11 +71.8 +71.8 +71.8) / 4 results
qianguangchao11 grade: 12.

map 100% completed. reduce failed with the following error:
java.io.IOException: Pass a Delete or a Put
at org.apache.hadoop.hbase.mapreduce.TableOutputFormat $ TableRecordWriter.write (TableOutputFormat.java: 123)
at org.apache.hadoop.hbase.mapreduce.TableOutputFormat $ TableRecordWriter.write (TableOutputFormat.java: 82)
at org.apache.hadoop.mapred.ReduceTask $ NewTrackingRecordWriter.write (ReduceTask.java: 587)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write (TaskInputOutputContext.java: 80)
at org.apache.hadoop.mapreduce.Reducer.reduce (Reducer.java: 156)
at org.apache.hadoop.mapreduce.Reducer.run (Reducer.java: 176)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer (ReduceTask.java: 649)
at org.apache.hadoop.mapred.ReduceTask.run (ReduceTask.java: 417)
at org.apache.hadoop.mapred.Child $ 4.run (Child.java: 255)
at java.security.AccessController.doPrivileged (Native Method)
at javax.security.auth.Subject.doAs (Subject.java: 415)
at org.apache.hadoop.security.UserGroupInformation.doAs (UserGroupInformation.java: 1093)
at org.apache.hadoop.mapred.Child.main (Child.java: 249)
------ Solution ---------------- ----------------------------
I also encountered the same problem, solved the landlord did not
----- - Solution --------------------------------------------
landlord solve No, I have encountered this problem!
------ Solution ---------------------------------------- ----
someone encountered this problem it
------ For reference only ------------------------ ---------------
Hbase = 0.92
Hadoop - 1.0.1
------ For reference only -------------------------- -------------

not, put the tips on google to have tried. Currently only know map is not a problem.
------ For reference only -------------------------------------- -
resolved,
overridden method add @ Override, found Reduce wrong. Should
@ Override
public void reduce (ImmutableBytesWritable key, Iterable values, Context context) throws IOException, InterruptedException {}.
without @ Override time will not help you to check the correctness reduce method.

------ For reference only ---------------------------------- -----
@ aimyray Thank you! you help me ruthless and more! I checked a lot of sites in English, but other methods will not work :)
Kenny

1 条评论: