使用Hive或者HadoopMR访问表格存储中的表
发布日期:2025-01-03 18:13 点击次数:121
配置表格存储作为mapper的数据源。 private static RangeRowQueryCriteria fetchCriteria() {
RangeRowQueryCriteria res = new RangeRowQueryCriteria("YourTableName");
res.setMaxVersions(1);
List<PrimaryKeyColumn> lower = new ArrayList<PrimaryKeyColumn>();
List<PrimaryKeyColumn> upper = new ArrayList<PrimaryKeyColumn>();
lower.add(new PrimaryKeyColumn("YourPkeyName", PrimaryKeyValue.INF_MIN));
upper.add(new PrimaryKeyColumn("YourPkeyName", PrimaryKeyValue.INF_MAX));
res.setInclusiveStartPrimaryKey(new PrimaryKey(lower));
res.setExclusiveEndPrimaryKey(new PrimaryKey(upper));
return res;
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "row count");
job.addFileToClassPath(new Path("hadoop-connector.jar"));
job.setJarByClass(RowCounter.class);
job.setMapperClass(RowCounterMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setInputFormatClass(TableStoreInputFormat.class);
TableStoreInputFormat.setEndpoint(job, "https://YourInstance.Region.ots.aliyuncs.com/");
TableStoreInputFormat.setCredential(job, "YourAccessKeyId", "YourAccessKeySecret");
TableStoreInputFormat.addCriteria(job, fetchCriteria());
FileOutputFormat.setOutputPath(job, new Path("output"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
} 示例中使用job.setInputFormatClass(TableStoreInputFormat.class)将表格存储设置为数据源,除此之外,还需要:把hadoop-connector.jar部署到集群上并添加到classpath中。路径为addFileToClassPath()指定hadoop-connector.jar的本地路径。代码中假定hadoop-connector.jar在当前路径。访问表格存储需要指定入口和身份。通过TableStoreInputFormat.setEndpoint()和TableStoreInputFormat.setCredential()设置访问表格存储需要指定的Endpoint和AccessKey信息。指定一张表用来计数。