-
Notifications
You must be signed in to change notification settings - Fork 177
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pig Job Auto Tuning Integration #290
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mkumar1984 , thanks for the contribution!
I have two overall comments:
- There should be much more code reuse, and more use of existing constants rather than redefining them.
- I think the tests seem a little bit lacking. I would love to see something that mocks out the HTTP request to verify expected input, return expected output, and see that configs are updated accordingly.
#pig.enable.tuning=false | ||
#tuning.api.end.point=http://hostname:8080/rest/getCurrentRunParameters | ||
#auto.tuning.job.type=PIG |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's going on here? Why are we adding commented-out properties?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are just kept for references, if user want to enable tuning they can do that. I have un-commented 2 of them.
public static final String JOB_ENABLE_TUNING = "job.enable.tuning"; | ||
|
||
//Internal flag for seeing whether this was a retry of the job after failure because of auto tuning | ||
public static final String AUTO_TUNING_RETRY = "auto.tuning.retry"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there no cleaner way to pass this around than as a job prop?
Also, if this is an internal flag, it should not be public.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This variable was not required here.
All other parameters are passed through job props, so thought of using same.
} | ||
if(enableTuning) | ||
{ | ||
HADOOP_SECURE_PIG_WRAPPER = HadoopTuningSecurePigWrapper.class.getName(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not be modifying a static variable here. IIUC this could affect other running jobs.
HADOOP_SECURE_PIG_WRAPPER
should really be a constant (final
), and there should be a new field like private final String securePigWrapper
which contains the wrapper to use for this specific instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are right here. Great catch. Done.
HADOOP_SECURE_PIG_WRAPPER = HadoopSecurePigWrapper.class.getName(); | ||
if (jobProps.containsKey(JOB_ENABLE_TUNING) && jobProps.containsKey(PIG_ENABLE_TUNING)) { | ||
enableTuning = jobProps.getBoolean(JOB_ENABLE_TUNING) && jobProps.getBoolean(PIG_ENABLE_TUNING); | ||
}else |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style does not match convention. }else
-> } else {
, if(
-> if (
, and opening curly brace should appear on the same line as the if
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -248,6 +269,7 @@ protected String getMainArguments() { | |||
classPath.add(HadoopConfigurationInjector.getPath(getJobProps(), | |||
getWorkingDirectory())); | |||
|
|||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove unnecessary whitespace changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
props.put(HadoopConfigurationInjector.INJECT_PREFIX + "mapreduce.reduce.memory.mb", "2048"); | ||
props.put(HadoopConfigurationInjector.INJECT_PREFIX + "mapreduce.map.memory.mb", "2048"); | ||
|
||
props.put(CommonJobProperties.EXEC_ID, CommonJobProperties.EXEC_ID); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could use a loop here to eliminate a lot of code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably create an array of all of these props you want to add, iterate through that array here and put them into props
, then iterate through it again at the end and verify that it hasn't changed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
public class TestHadoopTuningConfigurationInjector { | ||
|
||
@Test | ||
public void testTuningErrorHandler() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain how the tuning error handler is being used here? What is actually being tested?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo in method name. Changed the name.
@@ -0,0 +1,38 @@ | |||
package azkaban.jobtype; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
license
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -0,0 +1,56 @@ | |||
package azkaban.jobtype; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
license
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@Test | ||
public void testGetHadoopProperties() throws IOException { | ||
Props props = new Props(); | ||
props.put(HadoopConfigurationInjector.INJECT_PREFIX + "mapreduce.task.io.sort.mb", "100"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use Hadoop constants here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xkrogen covered my biggest concerns- Those of duplicate code in the HadoopConfigurationInjector and HadoopSecurePigWrapper. Also I do not think this is following the azkaban style guide in a number of places. Please refer to https://github.com/azkaban/azkaban/blob/master/CONTRIBUTING.md
* using injectResources() so that they are included in any Configuration | ||
* constructed. | ||
*/ | ||
public class HadoopTuningConfigurationInjector { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
It seems like almost all of the code is duplicated apart from a few lines.
Have taken care of all of the reviews except writing test using mock API call. This will take some time. Need to explore a bit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking really good after the last set of changes; thanks for incorporating all of my feedback! This time around is mostly just small style type nits. The only other outstanding thing is that I would like to see is a more full test with some HTTP endpoint mocking, which I believe you said you're working on already.
@@ -50,6 +50,7 @@ | |||
|
|||
public static final String WORKFLOW_ID_SEPERATOR = "$"; | |||
private static final String WORKFLOW_ID_CONFIG = "yarn.workflow.id"; | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this and a few other style changes in this class are not related to this change. Typically I discourage style changes which are not a part of the core of the change. Not sure if this is also the recommended approach for Azkaban code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure how this white space gets added. Removed for this case.
* using injectResources() so that they are included in any Configuration | ||
* constructed. | ||
*/ | ||
public class HadoopTuningConfigurationInjector { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! Looks great
jobTryCount++; | ||
firstTry = false; | ||
props = Props.clone(initialJobprops); | ||
props.put(TuningCommonConstants.AUTO_TUNING_RETRY, retry + ""); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
String.valueOf()
instead of + ""
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
||
/** | ||
* In case if job is failed because of auto tuning parameters, it will be retried with the best parameters | ||
* we have seen so far. Maximum number of try is 2. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Maximum number of try is 2" => "Maximum number of tries is 2 by default"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
isTuningError = true; | ||
} | ||
} | ||
reader.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be in a finally
clause. Ideally, use a try-with-resources statement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Used a try-with-resources statement. Thanks for the suggestion. Didn't know about this earlier.
retryCount++; | ||
log.info("Calling get current run parameters. Try count " + retryCount); | ||
params = getCurrentRunParameters(props); | ||
noException = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be cleaner to do away with the noException
variable and simply replace this line with break
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
} | ||
|
||
/** | ||
*This method is for getting current run parameters from Dr Elephant |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spacing in this line of Javadoc, and in general the Javadocs in this class could be filled out a bit more completely (e.g. there is no description of the arguments)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
CloseableHttpClient httpClient = HttpClients.createDefault(); | ||
|
||
HttpResponse response = null; | ||
String endPoint = String.format("%s", props.get(TuningCommonConstants.TUNING_API_END_POINT)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is being achieved with this line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Earlier was using multiple string concatenation. Removed the string.format. Directly using property.
userProps.put(HadoopConfigurationInjector.INJECT_PREFIX + MRJobConfig.REDUCE_MEMORY_MB, "2048"); | ||
userProps.put(HadoopConfigurationInjector.INJECT_PREFIX + MRJobConfig.MAP_MEMORY_MB, "2048"); | ||
|
||
azkabanProps.put(CommonJobProperties.EXEC_ID, CommonJobProperties.EXEC_ID); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I mentioned this in the last review, but don't see my comment / a response - can we set all of these properties via a loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
* This class is responsible for finding whether failure is because of tuning parameters. | ||
* This try to search predefined patterns in the log. | ||
*/ | ||
public class TuningErrorHandler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class doesn't actually handle tuning errors, just detects them. Maybe it should be called TuningErrorDetector
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to TuningErrorDetector.
@xkrogen and @inramana In case you are not aware yet, I wrote a tip for how to update copyright automatically. see https://github.com/azkaban/azkaban/wiki/Developer-Tools-and-Tips#use-intellij-to-create-and-update-copyright-automatically |
Just reduced verbosity:
https://www.javaworld.com/article/2074080/core-java/jdk-7--the-diamond-operator.html
…On Mon, May 21, 2018 at 1:42 PM mkumar1984 ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In plugins/jobtype/src/azkaban/jobtype/tuning/TuningErrorHandler.java
<#290 (comment)>
:
> +import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.log4j.Logger;
+
+
+/**
+ * This class is responsible for finding whether failure is because of tuning parameters.
+ * This try to search predefined patterns in the log.
+ */
+public class TuningErrorHandler {
+ private Logger log = Logger.getRootLogger();
+
+ private static List<Pattern> errorPatterns = new ArrayList<Pattern>();
Done. BTW what's the difference between two.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#290 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AGRBoQZkybwMU-cuhH8pWMQA9loQRqx4ks5t0yaugaJpZM4T5_09>
.
|
Added a test case for TestTuningParameterUtils.java to mock the API and test updateAutoTuningParameters. This should handle all the review comments. Let me know if there is any other comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking really good! Thanks for addressing our concerns thoroughly about the new tests. I left a few more comments, mostly minor. Also, the new mock test looks great, but can we also add one for the failure case (return a 500, 404, something like that)?
|
||
private static File pigLogFile; | ||
|
||
private static Props props; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whoops, sounds good.
if (HadoopSecureWrapperUtils.shouldProxy(jobProps)) { | ||
String tokenFile = System.getenv(HADOOP_TOKEN_FILE_LOCATION); | ||
UserGroupInformation proxyUser = HadoopSecureWrapperUtils.setupProxyUser(jobProps, tokenFile, logger); | ||
proxyUser.doAs(new PrivilegedExceptionAction<Void>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case, this could be greatly simplified like proxyUser.doAs(() -> runPigJob(args));
props.put(HadoopConfigurationInjector.INJECT_PREFIX + MRJobConfig.MAP_MEMORY_MB, "2048"); | ||
props.put(TuningCommonConstants.TUNING_API_END_POINT, "dummy_api_end_point"); | ||
|
||
Props passingProps=new Props(null, props); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
space around =
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Assert.assertEquals(MRJobConfig.IO_SORT_FACTOR + " value not correct", | ||
passingProps.getString(HadoopConfigurationInjector.INJECT_PREFIX + MRJobConfig.IO_SORT_FACTOR), "10"); | ||
|
||
passingProps=new Props(null, props); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
space around =
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
passingProps.getString(HadoopConfigurationInjector.INJECT_PREFIX + MRJobConfig.MAP_MEMORY_MB), "2048"); | ||
Assert.assertEquals(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT + " value not correct", | ||
passingProps.getString(HadoopConfigurationInjector.INJECT_PREFIX + MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT), "0.9"); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unnecessary blank
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
||
CloseableHttpClient httpClient = Mockito.mock(CloseableHttpClient.class); | ||
PowerMockito.when(HttpClients.createDefault()).thenReturn(httpClient); | ||
PowerMockito.when(EntityUtils.toString(Matchers.any(HttpEntity.class))).thenReturn(expectedJSON); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than mocking out a static method, can you simply have your HTTP client return a StringEntity
with a string defined by you? See http://grepcode.com/file/repo1.maven.org/maven2/org.apache.httpcomponents/httpcore/4.1/org/apache/http/entity/StringEntity.java Then EntityUtils
should be able to stringify it properly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
PowerMockito.mockStatic(EntityUtils.class); | ||
|
||
CloseableHttpClient httpClient = Mockito.mock(CloseableHttpClient.class); | ||
PowerMockito.when(HttpClients.createDefault()).thenReturn(httpClient); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Though PowerMockito
's static mocking abilities are definitely powerful, it also relies on rewriting bytecode to redirect calls somewhat magically. My inclination is that this should only be used as a last resort, for example when dealing with library code that you cannot easily modify. I would prefer to see the updateAutoTuningParameters()
method refactored slightly to allow for you to pass in your own HttpClient
, avoiding this type of mocking. Let me know if you disagree.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. Removing PowerMockito's usage. Changed TuningParameterUtils to have all methods as static.
LGTM, great work @mkumar1984 ! |
@mkumar1984 can you resolve conflicts? |
I believe the code has been moved to azkaban main repo. |
This change is for integrating Pig job with auto tuning framework (https://github.com/linkedin/dr-elephant/wiki/Auto-Tuning). Auto Tuning framework provides a way to automatically tune hadoop jobs. Currently it supports Pig Job for resource usage optimization. Integration is pretty easy. It requires calling getCurrentRunParameters API before running any job. This API responds with a parameter suggestion. These parameters should be used for the configuration of the job.
Design
Currently configurations are injected using HadoopConfigurationInjector. We have written a new HadoopTuningConfigurationInjector. This new injector will inject default configuration first and then call getCurrentRunParameter API to get current run of parameters from AutoTuning framework and inject those parameters in configuration of the job.
Failure Handling
-- API down: In case API is down, fallback method would be to let job use default configuration.
-- Other failed reason: In case there is other failures, we will identify if this is because of auto tuning by analyzing log and then retry getCurrentRunParameter API with isRetry flag enabled. In this case getCurrentRunParameter returns best parameters from already tried parameters.
We will create a new wrapper HadoopTuneInSecurePigWrapper, similar to HadoopSecurePigWrapper which will have this new flow of injecting parameter, handling failures and calling status API. And based on flag auto_tuning_enabled (default false), we can choose which wrapper to use. (HadoopPigJob) Another approach could be to use auto_tuning_enabled flag to decide on flow and don’t create HadoopTuneInSecurePigWrapper.
We will avoid doing configuration override in the script and will ask users to not keep any configuration inside the script for enabling AutoTuning.
Manual Test Cases
AzkabanPigJobTypeTestCases.xlsx