-
Notifications
You must be signed in to change notification settings - Fork 9
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added Turbinia scaffolder for generating jobs and tasks. (#60)
- Loading branch information
Showing
8 changed files
with
311 additions
and
9 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
# -*- coding: utf-8 -*- | ||
"""Turbinia component scaffolder.""" | ||
import datetime | ||
import os | ||
import logging | ||
|
||
from typing import Dict | ||
from typing import Iterator | ||
from typing import Tuple | ||
|
||
from l2tscaffolder.lib import definitions | ||
from l2tscaffolder.lib import mapping_helper | ||
from l2tscaffolder.scaffolders import interface | ||
from l2tscaffolder.scaffolders import manager | ||
|
||
|
||
class TurbiniaJobTaskScaffolder(interface.Scaffolder): | ||
"""The Turbinia base scaffolder interface. | ||
Attributes: | ||
class_name (str): class name of the Turbinia job and task to be generated. | ||
""" | ||
|
||
# The name of the scaffolder plugin. | ||
NAME = 'turbinia_job_and_task' | ||
|
||
# One liner describing what the scaffolder provides. | ||
DESCRIPTION = ( | ||
'Provides a scaffolder to generate a Turbinia job and task plugins.') | ||
|
||
# Define which project this particular scaffolder belongs to. | ||
PROJECT = definitions.DEFINITION_TURBINIA | ||
|
||
# Filenames of templates. | ||
TEMPLATE_JOB_FILE = 'turbinia_job.jinja2' | ||
TEMPLATE_TASK_FILE = 'turbinia_task.jinja2' | ||
|
||
def __init__(self): | ||
"""Initializes the Turbinia scaffolder.""" | ||
super(TurbiniaJobTaskScaffolder, self).__init__() | ||
self._job_path = os.path.join('turbinia', 'jobs') | ||
self._task_path = os.path.join('turbinia', 'workers') | ||
self._mapping_helper = mapping_helper.MappingHelper() | ||
|
||
self.class_name = '' | ||
|
||
def _GenerateJobFile(self) -> str: | ||
"""Generates the job job file.""" | ||
return self._mapping_helper.RenderTemplate( | ||
self.TEMPLATE_JOB_FILE, self.GetJinjaContext()) | ||
|
||
def _GenerateTaskFile(self) -> str: | ||
"""Generates the task file.""" | ||
return self._mapping_helper.RenderTemplate( | ||
self.TEMPLATE_TASK_FILE, self.GetJinjaContext()) | ||
|
||
def GetInitFileChanges(self) -> Iterator[Tuple[str, str]]: | ||
"""Generate a list of init files that need changing and the changes to them. | ||
Yields: | ||
Tuple[str, str]: path to the init file and the entry to add to it. | ||
""" | ||
python_init_path = self._job_path.replace(os.sep, '.') | ||
job_string = 'from {0:s} import {1:s}\n'.format( | ||
python_init_path, self._output_name) | ||
job_init_path = os.path.join(self._job_path, '__init__.py') | ||
yield job_init_path, job_string | ||
|
||
def GetJinjaContext(self) -> Dict[str, object]: | ||
"""Returns a dict that can be used as a context for Jinja2 templates. | ||
Returns: | ||
dict: containing: | ||
str: name of Jinja argument. | ||
object: Jinja argument value. | ||
""" | ||
context = super(TurbiniaJobTaskScaffolder, self).GetJinjaContext() | ||
context['class_name'] = self.class_name | ||
context['plugin_name'] = self._output_name | ||
time_now = datetime.datetime.utcnow() | ||
context['year'] = time_now.year | ||
|
||
return context | ||
|
||
def GenerateFiles(self) -> Iterator[Tuple[str, str]]: | ||
"""Generates all the files required for a Turbinia component. | ||
Yields: | ||
list[tuple]: containing: | ||
str: file name. | ||
str: file content. | ||
""" | ||
plugin_name = '{0:s}.py'.format(self._output_name) | ||
|
||
self.class_name = self._mapping_helper.GenerateClassName( | ||
self._output_name) | ||
|
||
try: | ||
job_path = os.path.join(self._job_path, plugin_name) | ||
job_content = self._GenerateJobFile() | ||
yield job_path, job_content | ||
except SyntaxError as exception: | ||
logging.error(( | ||
'Syntax error while attempting to generate component, error ' | ||
'message: {0!s}').format(exception)) | ||
|
||
try: | ||
task_path = os.path.join(self._task_path, plugin_name) | ||
task_content = self._GenerateTaskFile() | ||
yield task_path, task_content | ||
except SyntaxError as exception: | ||
logging.error(( | ||
'Syntax error while attempting to generate component, error ' | ||
'message: {0!s}').format(exception)) | ||
|
||
|
||
manager.ScaffolderManager.RegisterScaffolder(TurbiniaJobTaskScaffolder) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
# -*- coding: utf-8 -*- | ||
# Copyright {{ year }} Google Inc. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
"""Job to execute {{ plugin_name|lower|replace('_',' ')}} task.""" | ||
|
||
from __future__ import unicode_literals | ||
|
||
# TODO: import from turbinia.evidence the needed files, eg: | ||
# from turbinia.evidence import FilteredTextFile | ||
from turbinia.jobs import interface | ||
from turbinia.jobs import manager | ||
from turbinia.workers.{{ plugin_name }} import {{ class_name }}Task | ||
|
||
|
||
class {{ class_name }}Job(interface.TurbiniaJob): | ||
"""Need to add some description here.""" | ||
|
||
# The types of evidence that this Job will process | ||
# TODO: Fill this out. | ||
evidence_input = [] | ||
evidence_output = [] | ||
|
||
NAME = '{{ class_name }}Job' | ||
|
||
def create_tasks(self, evidence): | ||
"""Create tasks. | ||
|
||
Args: | ||
evidence: List of evidence object to process | ||
|
||
Returns: | ||
A list of tasks to schedule. | ||
""" | ||
# TODO: Fill in the tasks. | ||
tasks = [{{ class_name}}Task() for _ in evidence] | ||
return tasks | ||
|
||
|
||
manager.JobsManager.RegisterJob({{ class_name }}Job) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
# -*- coding: utf-8 -*- | ||
# Copyright {{ year }} Google Inc. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
"""Task to execute {{ plugin_name|lower|replace('_',' ')}}.""" | ||
|
||
from __future__ import unicode_literals | ||
|
||
import os | ||
|
||
# TODO: import from turbinia.evidence the needed files, eg: | ||
# from turbinia.evidence import FilteredTextFile | ||
from turbinia.workers import TurbiniaTask | ||
|
||
|
||
class {{ class_name }}Task(TurbiniaTask): | ||
"""Class for {{ plugin_name }}.""" | ||
|
||
def run(self, evidence, result): | ||
"""TODO: Add docstring. | ||
|
||
Args: | ||
evidence (Evidence object): The evidence we will process | ||
result (TurbiniaTaskResult): The object to place task results into. | ||
|
||
Returns: | ||
TurbiniaTaskResult object. | ||
""" | ||
# TODO: Fill in the output evidence. | ||
output_evidence = None | ||
|
||
# TODO: Fill in the command. | ||
cmd = '' | ||
|
||
result.log('Running [{0:s}]'.format(cmd)) | ||
self.execute( | ||
cmd, result, new_evidence=[output_evidence], close=True, shell=True) | ||
|
||
return result |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
# -*- coding: utf-8 -*- | ||
# Copyright 2018 Google Inc. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
"""Job to execute secret processing task.""" | ||
|
||
from __future__ import unicode_literals | ||
|
||
# TODO: import from turbinia.evidence the needed files, eg: | ||
# from turbinia.evidence import FilteredTextFile | ||
from turbinia.jobs import interface | ||
from turbinia.jobs import manager | ||
from turbinia.workers.secret_processing import SecretProcessingTask | ||
|
||
|
||
class SecretProcessingJob(interface.TurbiniaJob): | ||
"""Need to add some description here.""" | ||
|
||
# The types of evidence that this Job will process | ||
# TODO: Fill this out. | ||
evidence_input = [] | ||
evidence_output = [] | ||
|
||
NAME = 'SecretProcessingJob' | ||
|
||
def create_tasks(self, evidence): | ||
"""Create tasks. | ||
Args: | ||
evidence: List of evidence object to process | ||
Returns: | ||
A list of tasks to schedule. | ||
""" | ||
# TODO: Fill in the tasks. | ||
tasks = [SecretProcessingTask() for _ in evidence] | ||
return tasks | ||
|
||
|
||
manager.JobsManager.RegisterJob(SecretProcessingJob) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
# -*- coding: utf-8 -*- | ||
"""Tests for the Turbinia job and task scaffolder.""" | ||
import unittest | ||
|
||
from l2tscaffolder.scaffolders import turbinia | ||
|
||
|
||
class TurbiniaJobTaskScaffolderTest(unittest.TestCase): | ||
"""Test class for the Turbinia job and task scaffolder.""" | ||
|
||
maxDiff = None | ||
|
||
def testTurbiniaJobTaskScaffolder(self): | ||
"""Test the Turbinia job and task scaffolder.""" | ||
scaffolder = turbinia.TurbiniaJobTaskScaffolder() | ||
scaffolder.SetOutputName('secret_processing') | ||
|
||
file_copy_paths = [x for _, x in scaffolder.GetFilesToCopy()] | ||
self.assertEqual(file_copy_paths, []) | ||
|
||
files_generated = dict(scaffolder.GenerateFiles()) | ||
|
||
expected_files = frozenset([ | ||
'turbinia/jobs/secret_processing.py', | ||
'turbinia/workers/secret_processing.py']) | ||
self.assertEqual(set(files_generated.keys()), expected_files) | ||
|
||
expected_init_files = frozenset([ | ||
'turbinia/jobs/__init__.py']) | ||
init_generated = dict(scaffolder.GetInitFileChanges()) | ||
self.assertEqual(set(init_generated.keys()), expected_init_files) | ||
|
||
with open('test_data/turbinia_job_output.py', 'r') as fh: | ||
expected_parser_content = fh.read() | ||
self.assertEqual( | ||
expected_parser_content, | ||
files_generated['turbinia/jobs/secret_processing.py']) | ||
|
||
|
||
if __name__ == '__main__': | ||
unittest.main() |