Java 类org.apache.hadoop.util.ReflectionUtils 实例源码

项目:hadoop    文件:CompressionEmulationUtil.java   
/**
 * Returns a {@link OutputStream} for a file that might need 
 * compression.
 */
static OutputStream getPossiblyCompressedOutputStream(Path file, 
                                                      Configuration conf)
throws IOException {
  FileSystem fs = file.getFileSystem(conf);
  JobConf jConf = new JobConf(conf);
  if (org.apache.hadoop.mapred.FileOutputFormat.getCompressOutput(jConf)) {
    // get the codec class
    Class<? extends CompressionCodec> codecClass =
      org.apache.hadoop.mapred.FileOutputFormat
                              .getOutputCompressorClass(jConf, 
                                                        GzipCodec.class);
    // get the codec implementation
    CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);

    // add the appropriate extension
    file = file.suffix(codec.getDefaultExtension());

    if (isCompressionEmulationEnabled(conf)) {
      FSDataOutputStream fileOut = fs.create(file, false);
      return new DataOutputStream(codec.createOutputStream(fileOut));
    }
  }
  return fs.create(file, false);
}
项目:angel    文件:DFSStorageOldAPI.java   
@SuppressWarnings({"rawtypes", "unchecked"})
public void initReader() throws IOException {
  try {
    Configuration conf = WorkerContext.get().getConf();
    String inputFormatClassName =
        conf.get(AngelConf.ANGEL_INPUTFORMAT_CLASS,
            AngelConf.DEFAULT_ANGEL_INPUTFORMAT_CLASS);

    Class<? extends org.apache.hadoop.mapred.InputFormat> inputFormatClass =
        (Class<? extends org.apache.hadoop.mapred.InputFormat>) Class
            .forName(inputFormatClassName);

    org.apache.hadoop.mapred.InputFormat inputFormat =
        ReflectionUtils.newInstance(inputFormatClass,
            new JobConf(conf));

    org.apache.hadoop.mapred.RecordReader<KEY, VALUE> recordReader =
        inputFormat.getRecordReader(split, new JobConf(conf), Reporter.NULL);

    setReader(new DFSReaderOldAPI(recordReader));
  } catch (Exception x) {
    LOG.error("init reader error ", x);
    throw new IOException(x);
  }
}
项目:hadoop    文件:CompositeInputSplit.java   
/**
 * {@inheritDoc}
 * @throws IOException If the child InputSplit cannot be read, typically
 *                     for faliing access checks.
 */
@SuppressWarnings("unchecked")  // Generic array assignment
public void readFields(DataInput in) throws IOException {
  int card = WritableUtils.readVInt(in);
  if (splits == null || splits.length != card) {
    splits = new InputSplit[card];
  }
  Class<? extends InputSplit>[] cls = new Class[card];
  try {
    for (int i = 0; i < card; ++i) {
      cls[i] =
        Class.forName(Text.readString(in)).asSubclass(InputSplit.class);
    }
    for (int i = 0; i < card; ++i) {
      splits[i] = ReflectionUtils.newInstance(cls[i], null);
      splits[i].readFields(in);
    }
  } catch (ClassNotFoundException e) {
    throw (IOException)new IOException("Failed split init").initCause(e);
  }
}
项目:hadoop    文件:ResourceManager.java   
protected ResourceScheduler createScheduler() {
  String schedulerClassName = conf.get(YarnConfiguration.RM_SCHEDULER,
      YarnConfiguration.DEFAULT_RM_SCHEDULER);
  LOG.info("Using Scheduler: " + schedulerClassName);
  try {
    Class<?> schedulerClazz = Class.forName(schedulerClassName);
    if (ResourceScheduler.class.isAssignableFrom(schedulerClazz)) {
      return (ResourceScheduler) ReflectionUtils.newInstance(schedulerClazz,
          this.conf);
    } else {
      throw new YarnRuntimeException("Class: " + schedulerClassName
          + " not instance of " + ResourceScheduler.class.getCanonicalName());
    }
  } catch (ClassNotFoundException e) {
    throw new YarnRuntimeException("Could not instantiate Scheduler: "
        + schedulerClassName, e);
  }
}
项目:hadoop-oss    文件:HttpServer2.java   
/** Get an array of FilterConfiguration specified in the conf */
private static FilterInitializer[] getFilterInitializers(Configuration conf) {
  if (conf == null) {
    return null;
  }

  Class<?>[] classes = conf.getClasses(FILTER_INITIALIZER_PROPERTY);
  if (classes == null) {
    return null;
  }

  FilterInitializer[] initializers = new FilterInitializer[classes.length];
  for(int i = 0; i < classes.length; i++) {
    initializers[i] = (FilterInitializer)ReflectionUtils.newInstance(
        classes[i], conf);
  }
  return initializers;
}
项目:hadoop    文件:MapTask.java   
@SuppressWarnings("unchecked")
NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
                   JobConf job,
                   TaskUmbilicalProtocol umbilical,
                   TaskReporter reporter
                   ) throws IOException, ClassNotFoundException {
  collector = createSortingCollector(job, reporter);
  partitions = jobContext.getNumReduceTasks();
  if (partitions > 1) {
    partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
      ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
  } else {
    partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
      @Override
      public int getPartition(K key, V value, int numPartitions) {
        return partitions - 1;
      }
    };
  }
}
项目:hadoop-oss    文件:CompressionCodecFactory.java   
/**
 * Find the codecs specified in the config value io.compression.codecs 
 * and register them. Defaults to gzip and deflate.
 */
public CompressionCodecFactory(Configuration conf) {
  codecs = new TreeMap<String, CompressionCodec>();
  codecsByClassName = new HashMap<String, CompressionCodec>();
  codecsByName = new HashMap<String, CompressionCodec>();
  List<Class<? extends CompressionCodec>> codecClasses =
      getCodecClasses(conf);
  if (codecClasses == null || codecClasses.isEmpty()) {
    addCodec(new GzipCodec());
    addCodec(new DefaultCodec());      
  } else {
    for (Class<? extends CompressionCodec> codecClass : codecClasses) {
      addCodec(ReflectionUtils.newInstance(codecClass, conf));
    }
  }
}
项目:hadoop-oss    文件:WritableComparator.java   
/** Get a comparator for a {@link WritableComparable} implementation. */
public static WritableComparator get(
    Class<? extends WritableComparable> c, Configuration conf) {
  WritableComparator comparator = comparators.get(c);
  if (comparator == null) {
    // force the static initializers to run
    forceInit(c);
    // look to see if it is defined now
    comparator = comparators.get(c);
    // if not, use the generic one
    if (comparator == null) {
      comparator = new WritableComparator(c, conf, true);
    }
  }
  // Newly passed Configuration objects should be used.
  ReflectionUtils.setConf(comparator, conf);
  return comparator;
}
项目:hadoop    文件:Chain.java   
private <E> E makeCopyForPassByValue(Serialization<E> serialization,
                                      E obj) throws IOException {
  Serializer<E> ser =
    serialization.getSerializer(GenericsUtil.getClass(obj));
  Deserializer<E> deser =
    serialization.getDeserializer(GenericsUtil.getClass(obj));

  DataOutputBuffer dof = threadLocalDataOutputBuffer.get();

  dof.reset();
  ser.open(dof);
  ser.serialize(obj);
  ser.close();
  obj = ReflectionUtils.newInstance(GenericsUtil.getClass(obj),
                                    getChainJobConf());
  ByteArrayInputStream bais =
    new ByteArrayInputStream(dof.getData(), 0, dof.getLength());
  deser.open(bais);
  deser.deserialize(obj);
  deser.close();
  return obj;
}
项目:ditb    文件:Encryption.java   
public static KeyProvider getKeyProvider(Configuration conf) {
  String providerClassName = conf.get(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY,
    KeyStoreKeyProvider.class.getName());
  String providerParameters = conf.get(HConstants.CRYPTO_KEYPROVIDER_PARAMETERS_KEY, "");
  try {
    Pair<String,String> providerCacheKey = new Pair<String,String>(providerClassName,
      providerParameters);
    KeyProvider provider = keyProviderCache.get(providerCacheKey);
    if (provider != null) {
      return provider;
    }
    provider = (KeyProvider) ReflectionUtils.newInstance(
      getClassLoaderForClass(KeyProvider.class).loadClass(providerClassName),
      conf);
    provider.init(providerParameters);
    if (LOG.isDebugEnabled()) {
      LOG.debug("Installed " + providerClassName + " into key provider cache");
    }
    keyProviderCache.put(providerCacheKey, provider);
    return provider;
  } catch (Exception e) {
    throw new RuntimeException(e);
  }
}
项目:hadoop-oss    文件:TestWritable.java   
/** Utility method for testing writables. */
public static Writable testWritable(Writable before
        , Configuration conf) throws Exception {
  DataOutputBuffer dob = new DataOutputBuffer();
  before.write(dob);

  DataInputBuffer dib = new DataInputBuffer();
  dib.reset(dob.getData(), dob.getLength());

  Writable after = (Writable)ReflectionUtils.newInstance(
        before.getClass(), conf);
  after.readFields(dib);

  assertEquals(before, after);
  return after;
}
项目:hadoop    文件:AbstractReservationSystem.java   
private PlanFollower createPlanFollower() {
  String planFollowerPolicyClassName =
      conf.get(YarnConfiguration.RM_RESERVATION_SYSTEM_PLAN_FOLLOWER,
          getDefaultPlanFollower());
  if (planFollowerPolicyClassName == null) {
    return null;
  }
  LOG.info("Using PlanFollowerPolicy: " + planFollowerPolicyClassName);
  try {
    Class<?> planFollowerPolicyClazz =
        conf.getClassByName(planFollowerPolicyClassName);
    if (PlanFollower.class.isAssignableFrom(planFollowerPolicyClazz)) {
      return (PlanFollower) ReflectionUtils.newInstance(
          planFollowerPolicyClazz, conf);
    } else {
      throw new YarnRuntimeException("Class: " + planFollowerPolicyClassName
          + " not instance of " + PlanFollower.class.getCanonicalName());
    }
  } catch (ClassNotFoundException e) {
    throw new YarnRuntimeException(
        "Could not instantiate PlanFollowerPolicy: "
            + planFollowerPolicyClassName, e);
  }
}
项目:spark_deep    文件:Server.java   
private void processData(byte[] buf) throws  IOException, InterruptedException {
  DataInputStream dis =
    new DataInputStream(new ByteArrayInputStream(buf));
  int id = dis.readInt();                    // try to read an id

  if (LOG.isDebugEnabled())
    LOG.debug(" got #" + id);

  Writable param = ReflectionUtils.newInstance(paramClass, conf);//read param
  ((RPC.Invocation)param).setConf(conf);
  param.readFields(dis);        

  Call call = new Call(id, param, this);
  //
  callQueue.put(call);              // queue the call; maybe blocked here
  incRpcCount();  // Increment the rpc count
}
项目:ditb    文件:IndexFile.java   
@SuppressWarnings("unchecked")
public void readFields(DataInput in) throws IOException {
  // First clear the map. Otherwise we will just accumulate
  // entries every time this method is called.
  this.instance.clear();
  // Read the number of entries in the map
  int entries = in.readInt();
  // Then read each key/value pair
  for (int i = 0; i < entries; i++) {
    byte[] key = Bytes.readByteArray(in);
    byte id = in.readByte();
    Class clazz = getClass(id);
    V value = null;
    if (clazz.equals(byte[].class)) {
      byte[] bytes = Bytes.readByteArray(in);
      value = (V) bytes;
    } else {
      Writable w = (Writable) ReflectionUtils.newInstance(clazz, getConf());
      w.readFields(in);
      value = (V) w;
    }
    this.instance.put(key, value);
  }
}
项目:hadoop    文件:SharedCacheManager.java   
@SuppressWarnings("unchecked")
private static SCMStore createSCMStoreService(Configuration conf) {
  Class<? extends SCMStore> defaultStoreClass;
  try {
    defaultStoreClass =
        (Class<? extends SCMStore>) Class
            .forName(YarnConfiguration.DEFAULT_SCM_STORE_CLASS);
  } catch (Exception e) {
    throw new YarnRuntimeException("Invalid default scm store class"
        + YarnConfiguration.DEFAULT_SCM_STORE_CLASS, e);
  }

  SCMStore store =
      ReflectionUtils.newInstance(conf.getClass(
          YarnConfiguration.SCM_STORE_CLASS,
          defaultStoreClass, SCMStore.class), conf);
  return store;
}
项目:aliyun-maxcompute-data-collectors    文件:NetezzaExternalTableHCatImportMapper.java   
@Override
protected void setup(Context context)
  throws IOException, InterruptedException {
  Configuration conf = context.getConfiguration();
  helper = new SqoopHCatImportHelper(conf);
  String recordClassName = conf.get(ConfigurationHelper
    .getDbInputClassProperty());
  if (null == recordClassName) {
    throw new IOException("DB Input class name is not set!");
  }
  try {
    Class<?> cls = Class.forName(recordClassName, true,
      Thread.currentThread().getContextClassLoader());
    sqoopRecord = (SqoopRecord) ReflectionUtils.newInstance(cls, conf);
  } catch (ClassNotFoundException cnfe) {
    throw new IOException(cnfe);
  }

  if (null == sqoopRecord) {
    throw new IOException("Could not instantiate object of type "
      + recordClassName);
  }
}
项目:aliyun-maxcompute-data-collectors    文件:ParquetExportMapper.java   
@Override
protected void setup(Context context) throws IOException, InterruptedException {
  super.setup(context);

  Configuration conf = context.getConfiguration();

  // Instantiate a copy of the user's class to hold and parse the record.
  String recordClassName = conf.get(
      ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY);
  if (null == recordClassName) {
    throw new IOException("Export table class name ("
        + ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY
        + ") is not set!");
  }

  try {
    Class cls = Class.forName(recordClassName, true,
        Thread.currentThread().getContextClassLoader());
    recordImpl = (SqoopRecord) ReflectionUtils.newInstance(cls, conf);
  } catch (ClassNotFoundException cnfe) {
    throw new IOException(cnfe);
  }

  if (null == recordImpl) {
    throw new IOException("Could not instantiate object of type "
        + recordClassName);
  }

  columnTypes = DefaultStringifier.load(conf, AVRO_COLUMN_TYPES_MAP,
      MapWritable.class);
}
项目:aliyun-maxcompute-data-collectors    文件:CodecMap.java   
/**
 * Given a codec name, instantiate the concrete implementation
 * class that implements it.
 * @throws com.cloudera.sqoop.io.UnsupportedCodecException if a codec cannot
 * be found with the supplied name.
 */
public static CompressionCodec getCodec(String codecName,
  Configuration conf) throws com.cloudera.sqoop.io.UnsupportedCodecException {
  // Try standard Hadoop mechanism first
  CompressionCodec codec = getCodecByName(codecName, conf);
  if (codec != null) {
    return codec;
  }
  // Fall back to Sqoop mechanism
  String codecClassName = null;
  try {
    codecClassName = getCodecClassName(codecName);
    if (null == codecClassName) {
      return null;
    }
    Class<? extends CompressionCodec> codecClass =
        (Class<? extends CompressionCodec>)
        conf.getClassByName(codecClassName);
    return (CompressionCodec) ReflectionUtils.newInstance(
        codecClass, conf);
  } catch (ClassNotFoundException cnfe) {
    throw new com.cloudera.sqoop.io.UnsupportedCodecException(
        "Cannot find codec class "
        + codecClassName + " for codec " + codecName);
  }
}
项目:aliyun-maxcompute-data-collectors    文件:JobBase.java   
protected void doValidate(SqoopOptions options, Configuration conf,
                          ValidationContext validationContext)
  throws ValidationException {
  Validator validator = (Validator) ReflectionUtils.newInstance(
      options.getValidatorClass(), conf);
  ValidationThreshold threshold = (ValidationThreshold)
      ReflectionUtils.newInstance(options.getValidationThresholdClass(),
        conf);
  ValidationFailureHandler failureHandler = (ValidationFailureHandler)
      ReflectionUtils.newInstance(options.getValidationFailureHandlerClass(),
        conf);

  StringBuilder sb = new StringBuilder();
  sb.append("Validating the integrity of the import using the "
    + "following configuration\n");
  sb.append("\tValidator : ").append(validator.getClass().getName())
    .append('\n');
  sb.append("\tThreshold Specifier : ")
    .append(threshold.getClass().getName()).append('\n');
  sb.append("\tFailure Handler : ")
    .append(failureHandler.getClass().getName()).append('\n');
  LOG.info(sb.toString());
  validator.validate(validationContext, threshold, failureHandler);
}
项目:ditb    文件:HRegionServer.java   
static private ReplicationService newReplicationInstance(String classname, Configuration conf,
    HRegionServer server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException {

  Class<?> clazz = null;
  try {
    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
    clazz = Class.forName(classname, true, classLoader);
  } catch (java.lang.ClassNotFoundException nfe) {
    throw new IOException("Could not find class for " + classname);
  }

  // create an instance of the replication object.
  ReplicationService service = (ReplicationService) ReflectionUtils.newInstance(clazz, conf);
  service.initialize(server, fs, logDir, oldLogDir);
  return service;
}
项目:hadoop    文件:AbstractReservationSystem.java   
protected Planner getReplanner(String planQueueName) {
  ReservationSchedulerConfiguration reservationConfig =
      getReservationSchedulerConfiguration();
  String plannerClassName = reservationConfig.getReplanner(planQueueName);
  LOG.info("Using Replanner: " + plannerClassName + " for queue: "
      + planQueueName);
  try {
    Class<?> plannerClazz = conf.getClassByName(plannerClassName);
    if (Planner.class.isAssignableFrom(plannerClazz)) {
      Planner planner =
          (Planner) ReflectionUtils.newInstance(plannerClazz, conf);
      planner.init(planQueueName, reservationConfig);
      return planner;
    } else {
      throw new YarnRuntimeException("Class: " + plannerClazz
          + " not instance of " + Planner.class.getCanonicalName());
    }
  } catch (ClassNotFoundException e) {
    throw new YarnRuntimeException("Could not instantiate Planner: "
        + plannerClassName + " for queue: " + planQueueName, e);
  }
}
项目:hadoop    文件:RMProxy.java   
/**
 * Helper method to create FailoverProxyProvider.
 */
private <T> RMFailoverProxyProvider<T> createRMFailoverProxyProvider(
    Configuration conf, Class<T> protocol) {
  Class<? extends RMFailoverProxyProvider<T>> defaultProviderClass;
  try {
    defaultProviderClass = (Class<? extends RMFailoverProxyProvider<T>>)
        Class.forName(
            YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER);
  } catch (Exception e) {
    throw new YarnRuntimeException("Invalid default failover provider class" +
        YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER, e);
  }

  RMFailoverProxyProvider<T> provider = ReflectionUtils.newInstance(
      conf.getClass(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER,
          defaultProviderClass, RMFailoverProxyProvider.class), conf);
  provider.init(conf, (RMProxy<T>) this, protocol);
  return provider;
}
项目:hadoop    文件:SharedCacheChecksumFactory.java   
/**
 * Get a new <code>SharedCacheChecksum</code> object based on the configurable
 * algorithm implementation
 * (see <code>yarn.sharedcache.checksum.algo.impl</code>)
 *
 * @return <code>SharedCacheChecksum</code> object
 */
public static SharedCacheChecksum getChecksum(Configuration conf) {
  Class<? extends SharedCacheChecksum> clazz =
      conf.getClass(YarnConfiguration.SHARED_CACHE_CHECKSUM_ALGO_IMPL,
      defaultAlgorithm, SharedCacheChecksum.class);
  SharedCacheChecksum checksum = instances.get(clazz);
  if (checksum == null) {
    try {
      checksum = ReflectionUtils.newInstance(clazz, conf);
      SharedCacheChecksum old = instances.putIfAbsent(clazz, checksum);
      if (old != null) {
        checksum = old;
      }
    } catch (Exception e) {
      throw new YarnRuntimeException(e);
    }
  }

  return checksum;
}
项目:hadoop    文件:FileInputFormat.java   
/**
 * Get a PathFilter instance of the filter set for the input paths.
 *
 * @return the PathFilter instance set for the job, NULL if none has been set.
 */
public static PathFilter getInputPathFilter(JobContext context) {
  Configuration conf = context.getConfiguration();
  Class<?> filterClass = conf.getClass(PATHFILTER_CLASS, null,
      PathFilter.class);
  return (filterClass != null) ?
      (PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null;
}
项目:wherehowsX    文件:SequenceFileAnalyzer.java   
@Override
public SampleDataRecord getSampleData(Path path) throws IOException {
    SampleDataRecord dataRecord = null;
    if (!fs.exists(path))
        LOG.error("sequence file : " + path.toUri().getPath() + " is not exist on hdfs");
    else {
        try {
            LOG.info("sequencefileanalyzer start parse sampledata for  file path : {}", path.toUri().getPath());
            SequenceFile.Reader reader = new SequenceFile.Reader(fs.getConf(), SequenceFile.Reader.file(path));
            List<Object> sampleValues = new ArrayList<Object>();
            Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), fs.getConf());
            Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), fs.getConf());
            int count = 0;
            String keyName = "Key";
            String valueName = "Value";
            while (reader.next(key, value) && count < 12) {
                sampleValues.add("{\"" + keyName + "\": \"" + key + "\", \"" + valueName + "\": \"" + value + "\"}");
                count++;
            }
            dataRecord = new SampleDataRecord(path.toUri().getPath(), sampleValues);
            LOG.info("sequence file path : {}, sample data is {}", path.toUri().getPath(), sampleValues);
        } catch (Exception e) {
            LOG.error("path : {} content " + " is not Sequence File format content  ",path.toUri().getPath());
            LOG.info(e.getStackTrace().toString());
        }
    }
    return dataRecord;

}
项目:hadoop    文件:HttpServer2.java   
@Override
public void doGet(HttpServletRequest request, HttpServletResponse response)
  throws ServletException, IOException {
  if (!HttpServer2.isInstrumentationAccessAllowed(getServletContext(),
                                                  request, response)) {
    return;
  }
  response.setContentType("text/plain; charset=UTF-8");
  try (PrintStream out = new PrintStream(
      response.getOutputStream(), false, "UTF-8")) {
    ReflectionUtils.printThreadInfo(out, "");
  }
  ReflectionUtils.logThreadInfo(LOG, "jsp requested", 1);
}
项目:hadoop    文件:ResourceUsageMatcher.java   
/**
 * Configure the {@link ResourceUsageMatcher} to load the configured plugins
 * and initialize them.
 */
@SuppressWarnings("unchecked")
public void configure(Configuration conf, ResourceCalculatorPlugin monitor, 
                      ResourceUsageMetrics metrics, Progressive progress) {
  Class[] plugins = conf.getClasses(RESOURCE_USAGE_EMULATION_PLUGINS);
  if (plugins == null) {
    System.out.println("No resource usage emulator plugins configured.");
  } else {
    for (Class clazz : plugins) {
      if (clazz != null) {
        if (ResourceUsageEmulatorPlugin.class.isAssignableFrom(clazz)) {
          ResourceUsageEmulatorPlugin plugin = 
            (ResourceUsageEmulatorPlugin) ReflectionUtils.newInstance(clazz, 
                                                                      conf);
          emulationPlugins.add(plugin);
        } else {
          throw new RuntimeException("Misconfigured resource usage plugins. " 
              + "Class " + clazz.getClass().getName() + " is not a resource "
              + "usage plugin as it does not extend "
              + ResourceUsageEmulatorPlugin.class.getName());
        }
      }
    }
  }

  // initialize the emulators once all the configured emulator plugins are
  // loaded
  for (ResourceUsageEmulatorPlugin emulator : emulationPlugins) {
    emulator.initialize(conf, metrics, monitor, progress);
  }
}
项目:hadoop    文件:FsDatasetSpi.java   
/** @return the configured factory. */
public static Factory<?> getFactory(Configuration conf) {
  @SuppressWarnings("rawtypes")
  final Class<? extends Factory> clazz = conf.getClass(
      DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
      FsDatasetFactory.class,
      Factory.class);
  return ReflectionUtils.newInstance(clazz, conf);
}
项目:hadoop    文件:TestShufflePlugin.java   
@Test
/**
 * A testing method instructing core hadoop to load an external ShuffleConsumerPlugin
 * as if it came from a 3rd party.
 */
public void testPluginAbility() {

  try{
    // create JobConf with mapreduce.job.shuffle.consumer.plugin=TestShuffleConsumerPlugin
    JobConf jobConf = new JobConf();
    jobConf.setClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN,
                     TestShufflePlugin.TestShuffleConsumerPlugin.class,
                     ShuffleConsumerPlugin.class);

    ShuffleConsumerPlugin shuffleConsumerPlugin = null;
    Class<? extends ShuffleConsumerPlugin> clazz =
      jobConf.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
    assertNotNull("Unable to get " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, clazz);

    // load 3rd party plugin through core's factory method
    shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, jobConf);
    assertNotNull("Unable to load " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, shuffleConsumerPlugin);
  }
  catch (Exception e) {
    assertTrue("Threw exception:" + e, false);
  }
}
项目:hadoop    文件:TaggedInputSplit.java   
@SuppressWarnings("unchecked")
public void readFields(DataInput in) throws IOException {
  inputSplitClass = (Class<? extends InputSplit>) readClass(in);
  inputSplit = (InputSplit) ReflectionUtils
     .newInstance(inputSplitClass, conf);
  inputSplit.readFields(in);
  inputFormatClass = (Class<? extends InputFormat>) readClass(in);
  mapperClass = (Class<? extends Mapper>) readClass(in);
}
项目:hadoop    文件:Display.java   
public TextRecordInputStream(FileStatus f) throws IOException {
  final Path fpath = f.getPath();
  final Configuration lconf = getConf();
  r = new SequenceFile.Reader(lconf, 
      SequenceFile.Reader.file(fpath));
  key = ReflectionUtils.newInstance(
      r.getKeyClass().asSubclass(WritableComparable.class), lconf);
  val = ReflectionUtils.newInstance(
      r.getValueClass().asSubclass(Writable.class), lconf);
  inbuf = new DataInputBuffer();
  outbuf = new DataOutputBuffer();
}
项目:hadoop    文件:MapRunner.java   
@SuppressWarnings("unchecked")
public void configure(JobConf job) {
  this.mapper = ReflectionUtils.newInstance(job.getMapperClass(), job);
  //increment processed counter only if skipping feature is enabled
  this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(job)>0 && 
    SkipBadRecords.getAutoIncrMapperProcCount(job);
}
项目:hadoop    文件:SequenceFileInputFilter.java   
public FilterRecordReader(Configuration conf)
    throws IOException {
  super();
  // instantiate filter
  filter = (Filter)ReflectionUtils.newInstance(
    conf.getClass(FILTER_CLASS, PercentFilter.class), conf);
}
项目:hadoop    文件:JobConf.java   
/** 
 * Get the user defined {@link WritableComparable} comparator for 
 * grouping keys of inputs to the reduce.
 * 
 * @return comparator set by the user for grouping values.
 * @see #setOutputValueGroupingComparator(Class) for details.
 */
public RawComparator getOutputValueGroupingComparator() {
  Class<? extends RawComparator> theClass = getClass(
    JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);
  if (theClass == null) {
    return getOutputKeyComparator();
  }

  return ReflectionUtils.newInstance(theClass, this);
}
项目:ditb    文件:Compression.java   
private CompressionCodec buildCodec(Configuration conf) {
  try {
    Class<?> externalCodec =
        getClassLoaderForCodec().loadClass("com.hadoop.compression.lzo.LzoCodec");
    return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, new Configuration(
        conf));
  } catch (ClassNotFoundException e) {
    throw new RuntimeException(e);
  }
}
项目:ditb    文件:CellCreator.java   
public CellCreator(Configuration conf) {
  Class<? extends VisibilityExpressionResolver> clazz = conf.getClass(
      VISIBILITY_EXP_RESOLVER_CLASS, DefaultVisibilityExpressionResolver.class,
      VisibilityExpressionResolver.class);
  this.visExpResolver = ReflectionUtils.newInstance(clazz, conf);
  this.visExpResolver.init();
}
项目:hadoop-oss    文件:HttpServer2.java   
@Override
public void doGet(HttpServletRequest request, HttpServletResponse response)
  throws ServletException, IOException {
  if (!HttpServer2.isInstrumentationAccessAllowed(getServletContext(),
                                                  request, response)) {
    return;
  }
  response.setContentType("text/plain; charset=UTF-8");
  try (PrintStream out = new PrintStream(
      response.getOutputStream(), false, "UTF-8")) {
    ReflectionUtils.printThreadInfo(out, "");
  }
  ReflectionUtils.logThreadInfo(LOG, "jsp requested", 1);
}
项目:hadoop    文件:GenericWritable.java   
@Override
public void readFields(DataInput in) throws IOException {
  type = in.readByte();
  Class<? extends Writable> clazz = getTypes()[type & 0xff];
  try {
    instance = ReflectionUtils.newInstance(clazz, conf);
  } catch (Exception e) {
    e.printStackTrace();
    throw new IOException("Cannot initialize the class: " + clazz);
  }
  instance.readFields(in);
}
项目:hadoop    文件:InputSampler.java   
/**
 * Write a partition file for the given job, using the Sampler provided.
 * Queries the sampler for a sample keyset, sorts by the output key
 * comparator, selects the keys for each rank, and writes to the destination
 * returned from {@link TotalOrderPartitioner#getPartitionFile}.
 */
@SuppressWarnings("unchecked") // getInputFormat, getOutputKeyComparator
public static <K,V> void writePartitionFile(Job job, Sampler<K,V> sampler) 
    throws IOException, ClassNotFoundException, InterruptedException {
  Configuration conf = job.getConfiguration();
  final InputFormat inf = 
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
  int numPartitions = job.getNumReduceTasks();
  K[] samples = (K[])sampler.getSample(inf, job);
  LOG.info("Using " + samples.length + " samples");
  RawComparator<K> comparator =
    (RawComparator<K>) job.getSortComparator();
  Arrays.sort(samples, comparator);
  Path dst = new Path(TotalOrderPartitioner.getPartitionFile(conf));
  FileSystem fs = dst.getFileSystem(conf);
  if (fs.exists(dst)) {
    fs.delete(dst, false);
  }
  SequenceFile.Writer writer = SequenceFile.createWriter(fs, 
    conf, dst, job.getMapOutputKeyClass(), NullWritable.class);
  NullWritable nullValue = NullWritable.get();
  float stepSize = samples.length / (float) numPartitions;
  int last = -1;
  for(int i = 1; i < numPartitions; ++i) {
    int k = Math.round(stepSize * i);
    while (last >= k && comparator.compare(samples[last], samples[k]) == 0) {
      ++k;
    }
    writer.append(samples[k], nullValue);
    last = k;
  }
  writer.close();
}
项目:hadoop    文件:OverrideRecordReader.java   
@SuppressWarnings("unchecked") // Explicit check for value class agreement
public V createValue() {
  if (null == valueclass) {
    Class<?> cls = kids[kids.length -1].createValue().getClass();
    for (int i = kids.length -1; cls.equals(NullWritable.class); i--) {
      cls = kids[i].createValue().getClass();
    }
    valueclass = cls.asSubclass(Writable.class);
  }
  if (valueclass.equals(NullWritable.class)) {
    return (V) NullWritable.get();
  }
  return (V) ReflectionUtils.newInstance(valueclass, null);
}