大家好,我是考100分的小小码 ,祝大家学习进步,加薪顺利呀。今天说一说sqoop导入数据到hive原理_如何促进迁移,希望您对编程的造诣更进一步.
【摘要】
Sqoop是一种用于在
Apache Hadoop
和结构化数据存储(如关系数据库)之间高效传输批量数据的工具 。本文将简单介绍Sqoop作业执行时相关的类及方法,并将该过程与MapReduce的执行结合,分析数据如何从源端迁移到目的端。
Sqoop作业执行过程
-
Initializer:初始化阶段,源数据校验,参数初始化等工作;
-
Partitioner:源数据分片,根据作业并发数来决定源数据要切分多少片;
-
Extractor:开启extractor线程,根据用户配置从内存中构造数据写入队列;
-
Loader:开启loader线程,从队列中读取数据并抛出;
-
Destroyer:资源回收,断开sqoop与数据源的连接,并释放资源;
Initializer
public abstract void initialize(InitializerContext context, LinkConfiguration linkConfiguration,JobConfiguration jobConfiguration); public List<String> getJars(InitializerContext context, LinkConfiguration linkConfiguration,JobConfiguration jobConfiguration){ return new LinkedList<String>(); } public abstract Schema getSchema(InitializerContext context, LinkConfiguration linkConfiguration,JobConfiguration jobConfiguration) { return new NullSchema(); }
代码100分
Destroyer
代码100分public abstract void destroy(DestroyerContext context, LinkConfiguration linkConfiguration,JobConfiguration jobConfiguration);
Partitioner
public abstract List<Partition> getPartitions(PartitionerContext context, LinkConfiguration linkConfiguration, FromJobConfiguration jobConfiguration);
Partition类中实现了readFields()方法和write()方法,方便读写
代码100分public abstract class Partition { public abstract void readFields(DataInput in) throws IOException; public abstract void write(DataOutput out) throws IOException; public abstract String toString(); }
Extractor
while (resultSet.next()) { ... context.getDataWriter().writeArrayRecord(array); ... }
Loader
while ((array = context.getDataReader().readArrayRecord()) != null) { ... }
MapReduce执行过程
-
SqoopInputFormat的getSplits方法会调用Partitioner类的getPartitions方法
-
将返回的Partition列表包装到SqoopSplit中;
-
默认分片个数为10
-
SqoopMapper包含了一个SqoopMapDataWriter类,
-
Mapper的run()调用Extractor.extract方法,该方法迭代的获取源端数据再调用DataWriter写入
Context
中
private Class SqoopMapDataWriter extends DataWriter { ... private void writeContent() { ... context.wirte(writable, NullWritable.get()); // 这里的writable 是SqoopWritable的一个对象 ... } ... }
注意:这里的Context中存的是KV对,K是SqoopWritable,而V仅是一个空的Writable对象。SqoopWritable中实现了write和readField,用于序列化和反序列化。
-
SqoopOutputFormatLoadExecutor包装了SqoopOuputFormatDataReader,SqoopRecordWriter, ConsumerThread三个内部类;
-
SqoopNullOutputFormat
调用getRecordWriter时创建一个线程:
ConsumerThread
,代码如下
public RecordWriter<SqoopWritable, NullWritable> getRecordWriter() { executorService = Executors.newSingleThreadExecutor(...); consumerFuture = executorService.submit(new ConsumerThread(context)); return writer; }
- ConsumerThread集成了Runnable接口,线程内部调用Loader.load(…)方法,该方法用DataReader迭代的从Context中读取出SqoopWritable,并将其写入一个中间数据格式再写入目的端数据库中。
private class ConsumerThread implements Runnable { ... public void run() { ... Loader.load(loaderContext, connectorLinkConfig, ConnectorToJobConfig); ... } ... }
:
-
再本地模式下,Sqoop提交任务时没有设置SqoopReducer.class,MR会调用一个默认的reducer.class。
-
setContent
就是SqoopRecordWriter.write(…),它将SqoopWritable反序列化后存入中间存储格式中,即IntermediateDataFormat。与之对应,getContent就是从该中间存储格式中读取数据。 -
Sqoop定义了一个可插拔的中间数据格式抽象类,IntermediateDataFormat类,SqoopWritable打包了这个抽象类用来保存中间数据。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
转载请注明出处: https://daima100.com/8251.html